diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java index 54d516f903..ac874d5351 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenSyslog.java @@ -73,6 +73,7 @@ import org.apache.nifi.processor.util.listen.response.ChannelResponder; import org.apache.nifi.processors.standard.syslog.SyslogAttributes; import org.apache.nifi.processors.standard.syslog.SyslogEvent; import org.apache.nifi.processors.standard.syslog.SyslogParser; +import org.apache.nifi.security.util.SslContextFactory; import org.apache.nifi.ssl.SSLContextService; @SupportsBatching @@ -103,44 +104,49 @@ import org.apache.nifi.ssl.SSLContextService; public class ListenSyslog extends AbstractSyslogProcessor { public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new PropertyDescriptor.Builder() - .name("Max Size of Message Queue") - .description("The maximum size of the internal queue used to buffer messages being transferred from the underlying channel to the processor. " + + .name("Max Size of Message Queue") + .displayName("Max Size of Message Queue") + .description("The maximum size of the internal queue used to buffer messages being transferred from the underlying channel to the processor. " + "Setting this value higher allows more messages to be buffered in memory during surges of incoming messages, but increases the total " + "memory used by the processor.") - .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) - .defaultValue("10000") - .required(true) - .build(); + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("10000") + .required(true) + .build(); public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder() - .name("Receive Buffer Size") - .description("The size of each buffer used to receive Syslog messages. Adjust this value appropriately based on the expected size of the " + + .name("Receive Buffer Size") + .displayName("Receive Buffer Size") + .description("The size of each buffer used to receive Syslog messages. Adjust this value appropriately based on the expected size of the " + "incoming Syslog messages. When UDP is selected each buffer will hold one Syslog message. When TCP is selected messages are read " + "from an incoming connection until the buffer is full, or the connection is closed. ") - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .defaultValue("65507 B") - .required(true) - .build(); + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("65507 B") + .required(true) + .build(); public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder() - .name("Max Size of Socket Buffer") - .description("The maximum size of the socket buffer that should be used. This is a suggestion to the Operating System " + + .name("Max Size of Socket Buffer") + .displayName("Max Size of Socket Buffer") + .description("The maximum size of the socket buffer that should be used. This is a suggestion to the Operating System " + "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " + "the data can be read, and incoming data will be dropped.") - .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) - .defaultValue("1 MB") - .required(true) - .build(); + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .defaultValue("1 MB") + .required(true) + .build(); public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder() - .name("Max Number of TCP Connections") - .description("The maximum number of concurrent connections to accept Syslog messages in TCP mode.") - .addValidator(StandardValidators.createLongValidator(1, 65535, true)) - .defaultValue("2") - .required(true) - .build(); + .name("Max Number of TCP Connections") + .displayName("Max Number of TCP Connections") + .description("The maximum number of concurrent connections to accept Syslog messages in TCP mode.") + .addValidator(StandardValidators.createLongValidator(1, 65535, true)) + .defaultValue("2") + .required(true) + .build(); public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder() .name("Max Batch Size") + .displayName("Max Batch Size") .description( "The maximum number of Syslog events to add to a single FlowFile. If multiple events are available, they will be concatenated along with " - + "the up to this configured maximum number of messages") + + "the up to this configured maximum number of messages") .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .expressionLanguageSupported(false) .defaultValue("1") @@ -148,6 +154,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { .build(); public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() .name("Message Delimiter") + .displayName("Message Delimiter") .description("Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see property).") .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .defaultValue("\\n") @@ -155,28 +162,38 @@ public class ListenSyslog extends AbstractSyslogProcessor { .build(); public static final PropertyDescriptor PARSE_MESSAGES = new PropertyDescriptor.Builder() .name("Parse Messages") + .displayName("Parse Messages") .description("Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only " + - "contain the sender, protocol, and port, and no additional attributes.") + "contain the sender, protocol, and port, and no additional attributes.") .allowableValues("true", "false") .defaultValue("true") .required(true) .build(); public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() - .name("SSL Context Service") - .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, syslog " + + .name("SSL Context Service") + .displayName("SSL Context Service") + .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, syslog " + "messages will be received over a secure connection.") - .required(false) - .identifiesControllerService(SSLContextService.class) - .build(); + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() + .name("Client Auth") + .displayName("Client Auth") + .description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.") + .required(false) + .allowableValues(SSLContextService.ClientAuth.values()) + .defaultValue(SSLContextService.ClientAuth.REQUIRED.name()) + .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("Syslog messages that match one of the expected formats will be sent out this relationship as a FlowFile per message.") - .build(); + .name("success") + .description("Syslog messages that match one of the expected formats will be sent out this relationship as a FlowFile per message.") + .build(); public static final Relationship REL_INVALID = new Relationship.Builder() - .name("invalid") - .description("Syslog messages that do not match one of the expected formats will be sent out this relationship as a FlowFile per message.") - .build(); + .name("invalid") + .description("Syslog messages that do not match one of the expected formats will be sent out this relationship as a FlowFile per message.") + .build(); private Set relationships; private List descriptors; @@ -195,6 +212,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { descriptors.add(PORT); descriptors.add(NETWORK_INTF_NAME); descriptors.add(SSL_CONTEXT_SERVICE); + descriptors.add(CLIENT_AUTH); descriptors.add(RECV_BUFFER_SIZE); descriptors.add(MAX_MESSAGE_QUEUE_SIZE); descriptors.add(MAX_SOCKET_BUFFER_SIZE); @@ -252,6 +270,15 @@ public class ListenSyslog extends AbstractSyslogProcessor { .valid(false).subject("SSL Context").build()); } + // Validate CLIENT_AUTH + final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue(); + if (sslContextService != null && StringUtils.isBlank(clientAuth)) { + results.add(new ValidationResult.Builder() + .explanation("Client Auth must be provided when using TLS/SSL") + .valid(false).subject("Client Auth").build()); + } + + return results; } @@ -290,7 +317,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { // create either a UDP or TCP reader and call open() to bind to the given port final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); - channelDispatcher = createChannelReader(protocol, bufferPool, syslogEvents, maxConnections, sslContextService, Charset.forName(charSet)); + channelDispatcher = createChannelReader(context, protocol, bufferPool, syslogEvents, maxConnections, sslContextService, Charset.forName(charSet)); channelDispatcher.open(nicIPAddress, port, maxChannelBufferSize); final Thread readerThread = new Thread(channelDispatcher); @@ -305,7 +332,7 @@ public class ListenSyslog extends AbstractSyslogProcessor { } // visible for testing to be overridden and provide a mock ChannelDispatcher if desired - protected ChannelDispatcher createChannelReader(final String protocol, final BlockingQueue bufferPool, + protected ChannelDispatcher createChannelReader(final ProcessContext context, final String protocol, final BlockingQueue bufferPool, final BlockingQueue events, final int maxConnections, final SSLContextService sslContextService, final Charset charset) throws IOException { @@ -316,12 +343,16 @@ public class ListenSyslog extends AbstractSyslogProcessor { } else { // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher SSLContext sslContext = null; + SslContextFactory.ClientAuth clientAuth = null; + if (sslContextService != null) { - sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED); + final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue(); + sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.valueOf(clientAuthValue)); + clientAuth = SslContextFactory.ClientAuth.valueOf(clientAuthValue); } final ChannelHandlerFactory, AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>(); - return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, charset); + return new SocketChannelDispatcher(eventFactory, handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, clientAuth, charset); } }