NIFI-3670 - Expose the control of ListenSyslog's CLIENT_AUTH property to DFM

This closes #1720.

Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
Andre F de Miranda 2017-05-01 22:32:51 +10:00 committed by Bryan Bende
parent a9a9b67430
commit da6ad4f3bc
No known key found for this signature in database
GPG Key ID: A0DDA9ED50711C39
1 changed files with 72 additions and 41 deletions

View File

@ -73,6 +73,7 @@ import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.standard.syslog.SyslogAttributes; import org.apache.nifi.processors.standard.syslog.SyslogAttributes;
import org.apache.nifi.processors.standard.syslog.SyslogEvent; import org.apache.nifi.processors.standard.syslog.SyslogEvent;
import org.apache.nifi.processors.standard.syslog.SyslogParser; import org.apache.nifi.processors.standard.syslog.SyslogParser;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService;
@SupportsBatching @SupportsBatching
@ -104,6 +105,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new PropertyDescriptor.Builder() public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new PropertyDescriptor.Builder()
.name("Max Size of Message Queue") .name("Max Size of Message Queue")
.displayName("Max Size of Message Queue")
.description("The maximum size of the internal queue used to buffer messages being transferred from the underlying channel to the processor. " + .description("The maximum size of the internal queue used to buffer messages being transferred from the underlying channel to the processor. " +
"Setting this value higher allows more messages to be buffered in memory during surges of incoming messages, but increases the total " + "Setting this value higher allows more messages to be buffered in memory during surges of incoming messages, but increases the total " +
"memory used by the processor.") "memory used by the processor.")
@ -113,6 +115,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.build(); .build();
public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder() public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Receive Buffer Size") .name("Receive Buffer Size")
.displayName("Receive Buffer Size")
.description("The size of each buffer used to receive Syslog messages. Adjust this value appropriately based on the expected size of the " + .description("The size of each buffer used to receive Syslog messages. Adjust this value appropriately based on the expected size of the " +
"incoming Syslog messages. When UDP is selected each buffer will hold one Syslog message. When TCP is selected messages are read " + "incoming Syslog messages. When UDP is selected each buffer will hold one Syslog message. When TCP is selected messages are read " +
"from an incoming connection until the buffer is full, or the connection is closed. ") "from an incoming connection until the buffer is full, or the connection is closed. ")
@ -122,6 +125,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.build(); .build();
public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder() public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Max Size of Socket Buffer") .name("Max Size of Socket Buffer")
.displayName("Max Size of Socket Buffer")
.description("The maximum size of the socket buffer that should be used. This is a suggestion to the Operating System " + .description("The maximum size of the socket buffer that should be used. This is a suggestion to the Operating System " +
"to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " + "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " +
"the data can be read, and incoming data will be dropped.") "the data can be read, and incoming data will be dropped.")
@ -131,6 +135,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.build(); .build();
public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder() public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
.name("Max Number of TCP Connections") .name("Max Number of TCP Connections")
.displayName("Max Number of TCP Connections")
.description("The maximum number of concurrent connections to accept Syslog messages in TCP mode.") .description("The maximum number of concurrent connections to accept Syslog messages in TCP mode.")
.addValidator(StandardValidators.createLongValidator(1, 65535, true)) .addValidator(StandardValidators.createLongValidator(1, 65535, true))
.defaultValue("2") .defaultValue("2")
@ -138,6 +143,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.build(); .build();
public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder() public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Max Batch Size") .name("Max Batch Size")
.displayName("Max Batch Size")
.description( .description(
"The maximum number of Syslog events to add to a single FlowFile. If multiple events are available, they will be concatenated along with " "The maximum number of Syslog events to add to a single FlowFile. If multiple events are available, they will be concatenated along with "
+ "the <Message Delimiter> up to this configured maximum number of messages") + "the <Message Delimiter> up to this configured maximum number of messages")
@ -148,6 +154,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.build(); .build();
public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
.name("Message Delimiter") .name("Message Delimiter")
.displayName("Message Delimiter")
.description("Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see <Max Batch Size> property).") .description("Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see <Max Batch Size> property).")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("\\n") .defaultValue("\\n")
@ -155,6 +162,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.build(); .build();
public static final PropertyDescriptor PARSE_MESSAGES = new PropertyDescriptor.Builder() public static final PropertyDescriptor PARSE_MESSAGES = new PropertyDescriptor.Builder()
.name("Parse Messages") .name("Parse Messages")
.displayName("Parse Messages")
.description("Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only " + .description("Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only " +
"contain the sender, protocol, and port, and no additional attributes.") "contain the sender, protocol, and port, and no additional attributes.")
.allowableValues("true", "false") .allowableValues("true", "false")
@ -163,11 +171,20 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.build(); .build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service") .name("SSL Context Service")
.displayName("SSL Context Service")
.description("The Controller Service to use in order to obtain an SSL Context. If this property is set, syslog " + .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, syslog " +
"messages will be received over a secure connection.") "messages will be received over a secure connection.")
.required(false) .required(false)
.identifiesControllerService(SSLContextService.class) .identifiesControllerService(SSLContextService.class)
.build(); .build();
public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
.name("Client Auth")
.displayName("Client Auth")
.description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.")
.required(false)
.allowableValues(SSLContextService.ClientAuth.values())
.defaultValue(SSLContextService.ClientAuth.REQUIRED.name())
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder() public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success") .name("success")
@ -195,6 +212,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
descriptors.add(PORT); descriptors.add(PORT);
descriptors.add(NETWORK_INTF_NAME); descriptors.add(NETWORK_INTF_NAME);
descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(SSL_CONTEXT_SERVICE);
descriptors.add(CLIENT_AUTH);
descriptors.add(RECV_BUFFER_SIZE); descriptors.add(RECV_BUFFER_SIZE);
descriptors.add(MAX_MESSAGE_QUEUE_SIZE); descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
descriptors.add(MAX_SOCKET_BUFFER_SIZE); descriptors.add(MAX_SOCKET_BUFFER_SIZE);
@ -252,6 +270,15 @@ public class ListenSyslog extends AbstractSyslogProcessor {
.valid(false).subject("SSL Context").build()); .valid(false).subject("SSL Context").build());
} }
// Validate CLIENT_AUTH
final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
results.add(new ValidationResult.Builder()
.explanation("Client Auth must be provided when using TLS/SSL")
.valid(false).subject("Client Auth").build());
}
return results; return results;
} }
@ -290,7 +317,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
// create either a UDP or TCP reader and call open() to bind to the given port // create either a UDP or TCP reader and call open() to bind to the given port
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
channelDispatcher = createChannelReader(protocol, bufferPool, syslogEvents, maxConnections, sslContextService, Charset.forName(charSet)); channelDispatcher = createChannelReader(context, protocol, bufferPool, syslogEvents, maxConnections, sslContextService, Charset.forName(charSet));
channelDispatcher.open(nicIPAddress, port, maxChannelBufferSize); channelDispatcher.open(nicIPAddress, port, maxChannelBufferSize);
final Thread readerThread = new Thread(channelDispatcher); final Thread readerThread = new Thread(channelDispatcher);
@ -305,7 +332,7 @@ public class ListenSyslog extends AbstractSyslogProcessor {
} }
// visible for testing to be overridden and provide a mock ChannelDispatcher if desired // visible for testing to be overridden and provide a mock ChannelDispatcher if desired
protected ChannelDispatcher createChannelReader(final String protocol, final BlockingQueue<ByteBuffer> bufferPool, protected ChannelDispatcher createChannelReader(final ProcessContext context, final String protocol, final BlockingQueue<ByteBuffer> bufferPool,
final BlockingQueue<RawSyslogEvent> events, final int maxConnections, final BlockingQueue<RawSyslogEvent> events, final int maxConnections,
final SSLContextService sslContextService, final Charset charset) throws IOException { final SSLContextService sslContextService, final Charset charset) throws IOException {
@ -316,12 +343,16 @@ public class ListenSyslog extends AbstractSyslogProcessor {
} else { } else {
// if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
SSLContext sslContext = null; SSLContext sslContext = null;
SslContextFactory.ClientAuth clientAuth = null;
if (sslContextService != null) { if (sslContextService != null) {
sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED); final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuthValue));
clientAuth = SslContextFactory.ClientAuth.valueOf(clientAuthValue);
} }
final ChannelHandlerFactory<RawSyslogEvent<SocketChannel>, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>(); final ChannelHandlerFactory<RawSyslogEvent<SocketChannel>, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, charset); return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, clientAuth, charset);
} }
} }