NIFI-7401 Add ZooKeeper client TLS to CuratorLeaderElectionManager

NIFI-7401 Rebased to 1.13.0-SNAPSHOT and simplified tests

NIFI-7401 Added keystore types and changed properties to match nifi.security.*

NIFI-7401 Removed dead code from SecureClientZooKeeperFactory test

NIFI-7401 Renamed bean methods, moved helper code into NiFiProperties

NIFI-7401 Changed connection socket constants to use .class.getName()

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes #4592.
This commit is contained in:
Joey Frazee 2020-04-28 11:26:10 -05:00 committed by Nathan Gough
parent b442df58e2
commit 55cb8d73cb
9 changed files with 620 additions and 19 deletions

View File

@ -227,6 +227,13 @@ public abstract class NiFiProperties {
public static final String ZOOKEEPER_CONNECT_TIMEOUT = "nifi.zookeeper.connect.timeout";
public static final String ZOOKEEPER_SESSION_TIMEOUT = "nifi.zookeeper.session.timeout";
public static final String ZOOKEEPER_ROOT_NODE = "nifi.zookeeper.root.node";
public static final String ZOOKEEPER_CLIENT_SECURE = "nifi.zookeeper.client.secure";
public static final String ZOOKEEPER_SECURITY_KEYSTORE = "nifi.zookeeper.security.keystore";
public static final String ZOOKEEPER_SECURITY_KEYSTORE_TYPE = "nifi.zookeeper.security.keystoreType";
public static final String ZOOKEEPER_SECURITY_KEYSTORE_PASSWD = "nifi.zookeeper.security.keystorePasswd";
public static final String ZOOKEEPER_SECURITY_TRUSTSTORE = "nifi.zookeeper.security.truststore";
public static final String ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE = "nifi.zookeeper.security.truststoreType";
public static final String ZOOKEEPER_SECURITY_TRUSTSTORE_PASSWD = "nifi.zookeeper.security.truststorePasswd";
public static final String ZOOKEEPER_AUTH_TYPE = "nifi.zookeeper.auth.type";
public static final String ZOOKEEPER_KERBEROS_REMOVE_HOST_FROM_PRINCIPAL = "nifi.zookeeper.kerberos.removeHostFromPrincipal";
public static final String ZOOKEEPER_KERBEROS_REMOVE_REALM_FROM_PRINCIPAL = "nifi.zookeeper.kerberos.removeRealmFromPrincipal";
@ -284,6 +291,7 @@ public abstract class NiFiProperties {
public static final String DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT = "3 secs";
public static final String DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = "3 secs";
public static final String DEFAULT_ZOOKEEPER_ROOT_NODE = "/nifi";
public static final boolean DEFAULT_ZOOKEEPER_CLIENT_SECURE = false;
public static final String DEFAULT_ZOOKEEPER_AUTH_TYPE = "default";
public static final String DEFAULT_ZOOKEEPER_KERBEROS_REMOVE_HOST_FROM_PRINCIPAL = "true";
public static final String DEFAULT_ZOOKEEPER_KERBEROS_REMOVE_REALM_FROM_PRINCIPAL = "true";
@ -1313,6 +1321,20 @@ public abstract class NiFiProperties {
return networkInterfaces;
}
/**
* @return True if property value is 'true'; False if property value is 'false'. Throws an exception otherwise.
*/
public boolean isZooKeeperClientSecure() {
final String defaultValue = String.valueOf(DEFAULT_ZOOKEEPER_CLIENT_SECURE);
final String clientSecure = getProperty(ZOOKEEPER_CLIENT_SECURE, defaultValue).trim();
if (!"true".equalsIgnoreCase(clientSecure) && !"false".equalsIgnoreCase(clientSecure)) {
throw new RuntimeException(String.format("%s was '%s', expected true or false", NiFiProperties.ZOOKEEPER_CLIENT_SECURE, clientSecure));
}
return Boolean.parseBoolean(clientSecure);
}
public int size() {
return getPropertyKeys().size();
}

View File

@ -37,7 +37,7 @@
<module>nifi-record-path</module>
<module>nifi-rocksdb-utils</module>
<module>nifi-schema-utils</module>
<module>nifi-security-utils-api</module>
<module>nifi-security-utils-api</module>
<module>nifi-security-utils</module>
<module>nifi-site-to-site-client</module>
<module>nifi-socket-utils</module>

View File

@ -1973,24 +1973,25 @@ that indicates that any user is allowed to have full permissions to the data, or
allowed to access the data. Which ACL is used depends on the value of the `Access Control` property for the `ZooKeeperStateProvider` (see the
<<state_providers>> section for more information).
In order to use an ACL that indicates that only the Creator is allowed to access the data, we need to tell ZooKeeper who the Creator is. There are two
In order to use an ACL that indicates that only the Creator is allowed to access the data, we need to tell ZooKeeper who the Creator is. There are three
mechanisms for accomplishing this. The first mechanism is to provide authentication using Kerberos. See <<zk_kerberos_client>> for more information.
The second option is to use a user name and password. This is configured by specifying a value for the `Username` and a value for the `Password` properties
The second option, which additionally ensures that network communication is encrypted, is to authenticate using an X.509 certificate on a TLS-enabled ZooKeeper
server. See <<zk_tls_client>> for more information.
The third option is to use a username and password. This is configured by specifying a value for the `Username` and a value for the `Password` properties
for the `ZooKeeperStateProvider` (see the <<state_providers>> section for more information). The important thing to keep in mind here, though, is that ZooKeeper
will pass around the password in plain text. This means that using a user name and password should not be used unless ZooKeeper is running on localhost as a
one-instance cluster, or if communications with ZooKeeper occur only over encrypted communications, such as a VPN or an SSL connection. ZooKeeper will be
providing support for SSL connections in version 3.5.0.
will pass around the password in plain text. This means that using a username and password should not be used unless ZooKeeper is running on localhost as a
one-instance cluster, or if communications with ZooKeeper occur only over encrypted communications, such as a VPN or an SSL connection.
[[securing_zookeeper]]
=== Securing ZooKeeper
=== Securing ZooKeeper with Kerberos
When NiFi communicates with ZooKeeper, all communications, by default, are non-secure, and anyone who logs into ZooKeeper is able to view and manipulate all
of the NiFi state that is stored in ZooKeeper. To prevent this, we can use Kerberos to manage the authentication. At this time, ZooKeeper does not provide
support for encryption via SSL. Support for SSL in ZooKeeper is being actively developed and is expected to be available in the 3.5.x release version.
of the NiFi state that is stored in ZooKeeper. To prevent this, one option is to use Kerberos to manage authentication.
In order to secure the communications, we need to ensure that both the client and the server support the same configuration. Instructions for configuring the
In order to secure the communications with Kerberos, we need to ensure that both the client and the server support the same configuration. Instructions for configuring the
NiFi ZooKeeper client and embedded ZooKeeper server to use Kerberos are provided below.
If Kerberos is not already setup in your environment, you can find information on installing and setting up a Kerberos Server at
@ -2181,6 +2182,46 @@ java.arg.16=-Dsun.security.krb5.debug=true
This will cause the debug output to be written to the NiFi Bootstrap log file. By default, this is located at _$NIFI_HOME/logs/nifi-bootstrap.log_.
This output can be rather verbose but provides extremely valuable information for troubleshooting Kerberos failures.
[[zk_tls_client]]
=== Securing ZooKeeper with TLS
As discussed above, communications with ZooKeeper are insecure by default. The second option for securely authenticating to and communicating with ZooKeeper is to use
certificate-based authentication with a TLS-enabled ZooKeeper server (available since ZooKeeper's 3.5.x releases). Instructions for enabling TLS on an external
ZooKeeper ensemble can be found in the link:https://zookeeper.apache.org/doc/r3.5.5/zookeeperAdmin.html#sc_authOptions[ZooKeeper Administrator's Guide].
Once you have a TLS-enabled instance of ZooKeeper, TLS can be enabled for the NiFi client by creating a keystore and truststore and configuring the following properties
in the _$NIFI_HOME/conf/nifi.properties_ file:
[options="header,footer"]
|===
|Property Name|Description|Default
|`nifi.zookeeper.client.secure`|Whether to acccess ZooKeeper using client TLS.|false
|`nifi.zookeeper.security.keystore`|Filename of the Keystore containing the private key to use when communicating with ZooKeeper.|_none_
|`nifi.zookeeper.security.keystoreType`|Optional. The type of the Keystore. Must be `PKCS12`, `JKS`, or `PEM`. If not specified the type will be determined from the file extension (`.p12`, `.jks`, `.pem`).|_none_
|`nifi.zookeeper.security.keystorePasswd`|The password for the Keystore.|_none_
|`nifi.zookeeper.security.truststore`|Filename of the Truststore that will be used to verify the ZooKeeper server(s).|_none_
|`nifi.zookeeper.security.truststoreType`|Optional. The type of the Truststore. Must be `PKCS12`, `JKS`, or `PEM`. If not specified the type will be determined from the file extension (`.p12`, `.jks`, `.pem`).|_none_
|`nifi.zookeeper.security.truststorePasswd`|The password for the Truststore.|_none_
|===
These can be different from or identical to the configuration values in the `nifi.security.\*` security properties. If they are identical, the keystore and truststore must
still contain the appropriate keys and certificates for use with ZooKeeper (i.e., the keys and certificates need to align with the ZooKeeper configuration either way).
If not re-using the `nifi.security.*` keystore and truststore, NiFi's TLS Toolkit can still be used to help generate the keystore and truststore used for ZooKeeper client
access.
After updating the above properties and starting NiFi, network communication with ZooKeeper will be secure and ZooKeeper will now use the NiFi node's certificate principal
when authenticating access. This will be reflected in log messages like the following on the ZooKeeper server:
[source]
2020-02-24 23:37:52,671 [myid:2] - INFO [nioEventLoopGroup-4-1:X509AuthenticationProvider@172] - Authenticated Id 'CN=nifi-node1,OU=NIFI' for Scheme 'x509'
ZooKeeper uses Netty to support network encryption and certificate-based authentication. When TLS is enabled, both the ZooKeeper server and its clients must be configured to use Netty-based
connections instead of the default NIO implementations. This is configured automatically for NiFi when `nifi.zookeeper.client.secure` is set to
_true_. Once Netty is enabled, you should see log messages like the following in _$NIFI_HOME/logs/nifi-app.log_:
[source]
2020-02-24 23:37:54,082 INFO [nioEventLoopGroup-3-1] o.apache.zookeeper.ClientCnxnSocketNetty SSL handler added for channel: [id: 0xa831f9c3]
2020-02-24 23:37:54,104 INFO [nioEventLoopGroup-3-1] o.apache.zookeeper.ClientCnxnSocketNetty channel is connected: [id: 0xa831f9c3, L:/172.17.0.4:56510 - R:8e38869cd1d1/172.17.0.3:2281]
[[zookeeper_migrator]]
=== ZooKeeper Migrator
You can use the `zk-migrator` tool to perform the following tasks:
@ -3432,6 +3473,13 @@ for storing data. Each 'directory' in this structure is referred to as a ZNode.
that should be used for storing data. The default value is `/root`. This is important to set correctly, as which cluster
the NiFi instance attempts to join is determined by which ZooKeeper instance it connects to and the ZooKeeper Root Node
that is specified.
|`nifi.zookeeper.client.secure`|Whether to acccess ZooKeeper using client TLS.
|`nifi.zookeeper.security.keystore`|Filename of the Keystore containing the private key to use when communicating with ZooKeeper.
|`nifi.zookeeper.security.keystoreType`|Optional. The type of the Keystore. Must be `PKCS12`, `JKS`, or `PEM`. If not specified the type will be determined from the file extension (`.p12`, `.jks`, `.pem`).
|`nifi.zookeeper.security.keystorePasswd`|The password for the Keystore.
|`nifi.zookeeper.security.truststore`|Filename of the Truststore that will be used to verify the ZooKeeper server(s).
|`nifi.zookeeper.security.truststoreType`|Optional. The type of the Truststore. Must be `PKCS12`, `JKS`, or `PEM`. If not specified the type will be determined from the file extension (`.p12`, `.jks`, `.pem`).
|`nifi.zookeeper.security.truststorePasswd`|The password for the Truststore.
|====
[[kerberos_properties]]
@ -3711,7 +3759,7 @@ This section describes the original process for installing custom processors tha
Firstly, we will configure a directory for the custom processors. See <<processor-location-options>> for more about these configuration options.
nifi.nar.library.directory.myCustomLibs=./my-custom-nars/lib
Ensure that this directory exists and has appropriate permissions for the nifi user and group.
Now, we must place our custom processor nar in the configured directory. The configured directory is relative to the NiFi Home directory; for example, let us say that our NiFi Home Dir is `/var/lib/nifi`, we would place our custom processor nar in `/var/lib/nifi/my-custom-nars/lib`.
@ -3725,7 +3773,7 @@ Restart NiFi and the custom processor should now be available when adding a new
This section describes the process to use the Autoloading feature for custom processors.
To use the autoloading feature, the `nifi.nar.library.autoload.directory` property must be configured to point at the desired directory. By default, this points at `./extensions`.
To use the autoloading feature, the `nifi.nar.library.autoload.directory` property must be configured to point at the desired directory. By default, this points at `./extensions`.
For example:

View File

@ -30,6 +30,12 @@ import org.slf4j.LoggerFactory;
public class ZooKeeperClientConfig {
public static final String NETTY_CLIENT_CNXN_SOCKET =
org.apache.zookeeper.ClientCnxnSocketNetty.class.getName();
public static final String NIO_CLIENT_CNXN_SOCKET =
org.apache.zookeeper.ClientCnxnSocketNIO.class.getName();
private static final Logger logger = LoggerFactory.getLogger(ZooKeeperClientConfig.class);
private static final Pattern PORT_PATTERN = Pattern.compile("[0-9]{1,5}");
@ -37,17 +43,33 @@ public class ZooKeeperClientConfig {
private final int sessionTimeoutMillis;
private final int connectionTimeoutMillis;
private final String rootPath;
private final boolean clientSecure;
private final String keyStore;
private final String keyStoreType;
private final String keyStorePassword;
private final String trustStore;
private final String trustStoreType;
private final String trustStorePassword;
private final String authType;
private final String authPrincipal;
private final String removeHostFromPrincipal;
private final String removeRealmFromPrincipal;
private ZooKeeperClientConfig(String connectString, int sessionTimeoutMillis, int connectionTimeoutMillis, String rootPath,
String authType, String authPrincipal, String removeHostFromPrincipal, String removeRealmFromPrincipal) {
private ZooKeeperClientConfig(String connectString, int sessionTimeoutMillis, int connectionTimeoutMillis,
String rootPath, String authType, String authPrincipal, String removeHostFromPrincipal,
String removeRealmFromPrincipal, boolean clientSecure, String keyStore, String keyStoreType,
String keyStorePassword, String trustStore, String trustStoreType, String trustStorePassword) {
this.connectString = connectString;
this.sessionTimeoutMillis = sessionTimeoutMillis;
this.connectionTimeoutMillis = connectionTimeoutMillis;
this.rootPath = rootPath.endsWith("/") ? rootPath.substring(0, rootPath.length() - 1) : rootPath;
this.clientSecure = clientSecure;
this.keyStore = keyStore;
this.keyStoreType = keyStoreType;
this.keyStorePassword = keyStorePassword;
this.trustStore = trustStore;
this.trustStoreType = trustStoreType;
this.trustStorePassword = trustStorePassword;
this.authType = authType;
this.authPrincipal = authPrincipal;
this.removeHostFromPrincipal = removeHostFromPrincipal;
@ -70,6 +92,38 @@ public class ZooKeeperClientConfig {
return rootPath;
}
public boolean isClientSecure() {
return clientSecure;
}
public String getConnectionSocket() {
return (isClientSecure() ? NETTY_CLIENT_CNXN_SOCKET : NIO_CLIENT_CNXN_SOCKET);
}
public String getKeyStore() {
return keyStore;
}
public String getKeyStoreType() {
return keyStoreType;
}
public String getKeyStorePassword() {
return keyStorePassword;
}
public String getTrustStore() {
return trustStore;
}
public String getTrustStoreType() {
return trustStoreType;
}
public String getTrustStorePassword() {
return trustStorePassword;
}
public String getAuthType() {
return authType;
}
@ -107,7 +161,14 @@ public class ZooKeeperClientConfig {
final long sessionTimeoutMs = getTimePeriod(nifiProperties, NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT);
final long connectionTimeoutMs = getTimePeriod(nifiProperties, NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT);
final String rootPath = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, NiFiProperties.DEFAULT_ZOOKEEPER_ROOT_NODE);
final String authType = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_AUTH_TYPE,NiFiProperties.DEFAULT_ZOOKEEPER_AUTH_TYPE);
final boolean clientSecure = nifiProperties.isZooKeeperClientSecure();
final String keyStore = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE);
final String keyStoreType = StringUtils.stripToNull(nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE));
final String keyStorePassword = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_PASSWD);
final String trustStore = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE);
final String trustStoreType = StringUtils.stripToNull(nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE));
final String trustStorePassword = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_PASSWD);
final String authType = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_AUTH_TYPE, NiFiProperties.DEFAULT_ZOOKEEPER_AUTH_TYPE);
final String authPrincipal = nifiProperties.getKerberosServicePrincipal();
final String removeHostFromPrincipal = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_KERBEROS_REMOVE_HOST_FROM_PRINCIPAL,
NiFiProperties.DEFAULT_ZOOKEEPER_KERBEROS_REMOVE_HOST_FROM_PRINCIPAL);
@ -120,7 +181,23 @@ public class ZooKeeperClientConfig {
throw new IllegalArgumentException("The '" + NiFiProperties.ZOOKEEPER_ROOT_NODE + "' property in nifi.properties is set to an illegal value: " + rootPath);
}
return new ZooKeeperClientConfig(cleanedConnectString, (int) sessionTimeoutMs, (int) connectionTimeoutMs, rootPath, authType, authPrincipal, removeHostFromPrincipal, removeRealmFromPrincipal);
return new ZooKeeperClientConfig(
cleanedConnectString,
(int) sessionTimeoutMs,
(int) connectionTimeoutMs,
rootPath,
authType,
authPrincipal,
removeHostFromPrincipal,
removeRealmFromPrincipal,
clientSecure,
keyStore,
keyStoreType,
keyStorePassword,
trustStore,
trustStoreType,
trustStorePassword
);
}
private static int getTimePeriod(final NiFiProperties nifiProperties, final String propertyName, final String defaultValue) {

View File

@ -26,6 +26,7 @@ import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter
import org.apache.curator.framework.recipes.leader.Participant;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.util.NiFiProperties;
@ -35,7 +36,13 @@ import org.apache.nifi.util.timebuffer.TimedBuffer;
import org.apache.nifi.util.timebuffer.TimestampedLong;
import org.apache.nifi.util.timebuffer.TimestampedLongAggregation;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.admin.ZooKeeperAdmin;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.common.ClientX509Util;
import org.apache.zookeeper.common.PathUtils;
import org.apache.zookeeper.common.X509Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -381,14 +388,19 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
final RetryPolicy retryPolicy = new RetryNTimes(1, 100);
final CuratorACLProviderFactory aclProviderFactory = new CuratorACLProviderFactory();
final CuratorFramework client = CuratorFrameworkFactory.builder()
final CuratorFrameworkFactory.Builder clientBuilder = CuratorFrameworkFactory.builder()
.connectString(zkConfig.getConnectString())
.sessionTimeoutMs(zkConfig.getSessionTimeoutMillis())
.connectionTimeoutMs(zkConfig.getConnectionTimeoutMillis())
.retryPolicy(retryPolicy)
.aclProvider(aclProviderFactory.create(zkConfig))
.defaultData(new byte[0])
.build();
.defaultData(new byte[0]);
if (zkConfig.isClientSecure()) {
clientBuilder.zookeeperFactory(new SecureClientZooKeeperFactory(zkConfig));
}
final CuratorFramework client = clientBuilder.build();
client.start();
return client;
@ -589,4 +601,44 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
}
}
}
public static class SecureClientZooKeeperFactory implements ZookeeperFactory {
public static final String NETTY_CLIENT_CNXN_SOCKET =
org.apache.zookeeper.ClientCnxnSocketNetty.class.getName();
private ZKClientConfig zkSecureClientConfig;
public SecureClientZooKeeperFactory(final ZooKeeperClientConfig zkConfig) {
this.zkSecureClientConfig = new ZKClientConfig();
// Netty is required for the secure client config.
final String cnxnSocket = zkConfig.getConnectionSocket();
if (!NETTY_CLIENT_CNXN_SOCKET.equals(cnxnSocket)) {
throw new IllegalArgumentException(String.format("connection factory set to '%s', %s required", String.valueOf(cnxnSocket), NETTY_CLIENT_CNXN_SOCKET));
}
zkSecureClientConfig.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, cnxnSocket);
// This should never happen but won't get checked elsewhere.
final boolean clientSecure = zkConfig.isClientSecure();
if (!clientSecure) {
throw new IllegalStateException(String.format("%s set to '%b', expected true", ZKClientConfig.SECURE_CLIENT, clientSecure));
}
zkSecureClientConfig.setProperty(ZKClientConfig.SECURE_CLIENT, String.valueOf(clientSecure));
final X509Util clientX509util = new ClientX509Util();
zkSecureClientConfig.setProperty(clientX509util.getSslKeystoreLocationProperty(), zkConfig.getKeyStore());
zkSecureClientConfig.setProperty(clientX509util.getSslKeystoreTypeProperty(), zkConfig.getKeyStoreType());
zkSecureClientConfig.setProperty(clientX509util.getSslKeystorePasswdProperty(), zkConfig.getKeyStorePassword());
zkSecureClientConfig.setProperty(clientX509util.getSslTruststoreLocationProperty(), zkConfig.getTrustStore());
zkSecureClientConfig.setProperty(clientX509util.getSslTruststoreTypeProperty(), zkConfig.getTrustStoreType());
zkSecureClientConfig.setProperty(clientX509util.getSslTruststorePasswdProperty(), zkConfig.getTrustStorePassword());
}
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception {
return new ZooKeeperAdmin(connectString, sessionTimeout, watcher, zkSecureClientConfig);
}
}
}

View File

@ -17,9 +17,17 @@
package org.apache.nifi.cluster;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.nifi.properties.StandardNiFiProperties;
import org.apache.nifi.util.NiFiProperties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import org.junit.Test;
import java.util.Properties;
public class ZooKeeperClientConfigTest {
@Test
@ -71,4 +79,126 @@ public class ZooKeeperClientConfigTest {
public void testMultiValidOneNonsense(){
ZooKeeperClientConfig.cleanConnectString(" local : 1234 , local: 1235:wack,local :1295,local:14952 ");
}
@Test
public void testValidClientSecureTrue() {
final Properties properties = new Properties();
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "true");
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
assertTrue(zkClientConfig.isClientSecure());
assertEquals(zkClientConfig.getConnectionSocket(), ZooKeeperClientConfig.NETTY_CLIENT_CNXN_SOCKET);
}
@Test
public void testValidClientSecureFalse() {
final Properties properties = new Properties();
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "false");
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
assertFalse(zkClientConfig.isClientSecure());
assertEquals(zkClientConfig.getConnectionSocket(), ZooKeeperClientConfig.NIO_CLIENT_CNXN_SOCKET);
}
@Test
public void testValidClientSecureEmpty() {
final Properties properties = new Properties();
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "");
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
assertFalse(zkClientConfig.isClientSecure());
assertEquals(zkClientConfig.getConnectionSocket(), ZooKeeperClientConfig.NIO_CLIENT_CNXN_SOCKET);
}
@Test
public void testValidClientSecureSpaces() {
final Properties properties = new Properties();
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, " true ");
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
assertTrue(zkClientConfig.isClientSecure());
assertEquals(zkClientConfig.getConnectionSocket(), ZooKeeperClientConfig.NETTY_CLIENT_CNXN_SOCKET);
}
@Test
public void testValidClientSecureUpperCase() {
final Properties properties = new Properties();
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "TRUE");
ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
assertTrue(zkClientConfig.isClientSecure());
assertEquals(zkClientConfig.getConnectionSocket(), ZooKeeperClientConfig.NETTY_CLIENT_CNXN_SOCKET);
}
@Test(expected = RuntimeException.class)
public void testInvalidClientSecure() {
final Properties properties = new Properties();
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "meh");
ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
}
@Test
public void testKeyStoreTypes() {
final String storeType = "JKS";
final Properties properties = new Properties();
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, storeType);
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, storeType);
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
assertEquals(storeType, zkClientConfig.getKeyStoreType());
assertEquals(storeType, zkClientConfig.getTrustStoreType());
}
@Test
public void testKeyStoreTypesSpaces() {
final String storeType = " JKS ";
final Properties properties = new Properties();
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, storeType);
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, storeType);
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
final String expectedStoreType = "JKS";
assertEquals(expectedStoreType, zkClientConfig.getKeyStoreType());
assertEquals(expectedStoreType, zkClientConfig.getTrustStoreType());
}
@Test
public void testEmptyKeyStoreTypes() {
final Properties properties = new Properties();
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, "");
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, "");
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
assertNull(zkClientConfig.getKeyStoreType());
assertNull(zkClientConfig.getTrustStoreType());
}
@Test
public void testBlankKeyStoreTypes() {
final Properties properties = new Properties();
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, " ");
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, " ");
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
assertNull(zkClientConfig.getKeyStoreType());
assertNull(zkClientConfig.getTrustStoreType());
}
@Test
public void testValidCnxnSocketNames() {
assertEquals("org.apache.zookeeper.ClientCnxnSocketNetty", ZooKeeperClientConfig.NETTY_CLIENT_CNXN_SOCKET);
assertEquals("org.apache.zookeeper.ClientCnxnSocketNIO", ZooKeeperClientConfig.NIO_CLIENT_CNXN_SOCKET);
}
}

View File

@ -0,0 +1,258 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.leader.election;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.InstanceSpec;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.properties.StandardNiFiProperties;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager.SecureClientZooKeeperFactory;
import org.apache.nifi.security.util.CertificateUtils;
import org.apache.zookeeper.common.ClientX509Util;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.cert.CertificateException;
import java.security.GeneralSecurityException;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.Certificate;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
public class TestSecureClientZooKeeperFactory {
private static final String NETTY_SERVER_CNXN_FACTORY =
"org.apache.zookeeper.server.NettyServerCnxnFactory";
private static final String KEYSTORE_TYPE = "PKCS12";
private static final String KEYSTORE_TYPE_EXT = ".p12";
private static final String TEST_PASSWORD = "testpass";
private static ZooKeeperServer zkServer;
private static ServerCnxnFactory serverConnectionFactory;
private static NiFiProperties clientProperties;
private static Path tempDir;
private static Path dataDir;
private static Path serverKeyStore;
private static Path serverTrustStore;
private static Path clientKeyStore;
private static Path clientTrustStore;
private static int clientPort;
@BeforeClass
public static void setup() throws IOException, GeneralSecurityException, InterruptedException {
tempDir = Paths.get("target/TestSecureClientZooKeeperFactory");
dataDir = tempDir.resolve("state");
serverKeyStore = tempDir.resolve("server.keystore" + KEYSTORE_TYPE_EXT);
serverTrustStore = tempDir.resolve("server.truststore" + KEYSTORE_TYPE_EXT);
clientKeyStore = tempDir.resolve("client.keystore" + KEYSTORE_TYPE_EXT);
clientTrustStore = tempDir.resolve("client.truststore" + KEYSTORE_TYPE_EXT);
clientPort = InstanceSpec.getRandomPort();
Files.createDirectory(tempDir);
final X509Certificate clientCert = createKeyStore("client", TEST_PASSWORD, clientKeyStore, KEYSTORE_TYPE);
final X509Certificate serverCert = createKeyStore("zookeeper", TEST_PASSWORD, serverKeyStore, KEYSTORE_TYPE);
createTrustStore(serverCert, "zookeeper", TEST_PASSWORD, clientTrustStore, KEYSTORE_TYPE);
createTrustStore(clientCert, "client", TEST_PASSWORD, serverTrustStore, KEYSTORE_TYPE);
clientProperties = createClientProperties(
clientPort,
clientKeyStore,
KEYSTORE_TYPE,
TEST_PASSWORD,
clientTrustStore,
KEYSTORE_TYPE,
TEST_PASSWORD
);
serverConnectionFactory = createAndStartServer(
dataDir,
tempDir,
clientPort,
serverKeyStore,
TEST_PASSWORD,
serverTrustStore,
TEST_PASSWORD
);
zkServer = serverConnectionFactory.getZooKeeperServer();
}
@AfterClass
public static void cleanup() {
if (serverConnectionFactory != null) {
try {
serverConnectionFactory.shutdown();
} catch (final Exception ignore) {}
try {
zkServer.shutdown();
} catch (final Exception ignore) {}
}
if (tempDir != null) {
final List<Path> files = Arrays.asList(
dataDir.resolve("version-2/snapshot.0"),
dataDir.resolve("version-2/log.1"),
dataDir.resolve("version-2"),
dataDir.resolve("myid"),
serverKeyStore,
serverTrustStore,
clientKeyStore,
clientTrustStore,
dataDir,
tempDir
);
files.forEach(p -> {
try {
if (p != null) Files.deleteIfExists(p);
} catch (final IOException ioe) {}
});
}
}
@Test
public void testValidCnxnSocketName() {
assertEquals("org.apache.zookeeper.ClientCnxnSocketNetty", SecureClientZooKeeperFactory.NETTY_CLIENT_CNXN_SOCKET);
}
@Test(timeout = 30_000)
public void testServerCreatePath() throws Exception {
final ZooKeeperClientConfig zkClientConfig =
ZooKeeperClientConfig.createConfig(clientProperties);
final CuratorFrameworkFactory.Builder clientBuilder = CuratorFrameworkFactory.builder()
.connectString(zkClientConfig.getConnectString())
.sessionTimeoutMs(zkClientConfig.getSessionTimeoutMillis())
.connectionTimeoutMs(zkClientConfig.getConnectionTimeoutMillis())
.retryPolicy(new RetryOneTime(1000))
.defaultData(new byte[0])
.zookeeperFactory(new SecureClientZooKeeperFactory(zkClientConfig));
final CuratorFramework client = clientBuilder.build();
client.start();
final String testPath = "/test";
final String createResult = client.create().forPath(testPath, new byte[0]);
final Stat checkExistsResult = client.checkExists().forPath(testPath);
assertEquals(createResult, testPath);
assertNotNull(checkExistsResult);
}
private static ServerCnxnFactory createAndStartServer(final Path dataDir,
final Path tempDir, final int clientPort, final Path keyStore,
final String keyStorePassword, final Path trustStore,
final String trustStorePassword) throws IOException, InterruptedException {
final ClientX509Util x509Util = new ClientX509Util();
System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, NETTY_SERVER_CNXN_FACTORY);
System.setProperty(x509Util.getSslAuthProviderProperty(), "x509");
System.setProperty(x509Util.getSslKeystoreLocationProperty(), keyStore.toString());
System.setProperty(x509Util.getSslKeystorePasswdProperty(), keyStorePassword);
System.setProperty(x509Util.getSslTruststoreLocationProperty(), trustStore.toString());
System.setProperty(x509Util.getSslTruststorePasswdProperty(), trustStorePassword);
System.setProperty("zookeeper.authProvider.x509", "org.apache.zookeeper.server.auth.X509AuthenticationProvider");
ZooKeeperServer zkServer = new ZooKeeperServer(dataDir.toFile(), dataDir.toFile(), 2000);
ServerCnxnFactory secureConnectionFactory = ServerCnxnFactory.createFactory(clientPort, -1);
secureConnectionFactory.configure(new InetSocketAddress(clientPort), -1, true);
secureConnectionFactory.startup(zkServer);
return secureConnectionFactory;
}
private static NiFiProperties createClientProperties(final int clientPort,
final Path keyStore, final String keyStoreType, final String keyStorePassword,
final Path trustStore, final String trustStoreType, final String trustStorePassword) {
final Properties properties = new Properties();
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, String.format("localhost:%d", clientPort));
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "true");
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE, keyStore.toString());
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, keyStoreType);
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_PASSWD, keyStorePassword);
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE, trustStore.toString());
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, trustStoreType);
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_PASSWD, trustStorePassword);
return new StandardNiFiProperties(properties);
}
private static X509Certificate createKeyStore(final String alias,
final String password, final Path path, final String keyStoreType)
throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
try (final FileOutputStream outputStream = new FileOutputStream(path.toFile())) {
final KeyPair keyPair = KeyPairGenerator.getInstance("RSA").generateKeyPair();
final X509Certificate selfSignedCert = CertificateUtils.generateSelfSignedX509Certificate(
keyPair, "CN=localhost", "SHA256withRSA", 365
);
final char[] passwordChars = password.toCharArray();
final KeyStore keyStore = KeyStore.getInstance(keyStoreType);
keyStore.load(null, null);
keyStore.setKeyEntry(alias, keyPair.getPrivate(), passwordChars,
new Certificate[]{selfSignedCert});
keyStore.store(outputStream, passwordChars);
return selfSignedCert;
}
}
private static void createTrustStore(final X509Certificate cert,
final String alias, final String password, final Path path, final String keyStoreType)
throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
try (final FileOutputStream outputStream = new FileOutputStream(path.toFile())) {
final KeyStore trustStore = KeyStore.getInstance(keyStoreType);
trustStore.load(null, null);
trustStore.setCertificateEntry(alias, cert);
trustStore.store(outputStream, password.toCharArray());
}
}
}

View File

@ -202,6 +202,13 @@
<nifi.zookeeper.connect.timeout>10 secs</nifi.zookeeper.connect.timeout>
<nifi.zookeeper.session.timeout>10 secs</nifi.zookeeper.session.timeout>
<nifi.zookeeper.root.node>/nifi</nifi.zookeeper.root.node>
<nifi.zookeeper.client.secure>false</nifi.zookeeper.client.secure>
<nifi.zookeeper.security.keystore />
<nifi.zookeeper.security.keystoreType />
<nifi.zookeeper.security.keystorePasswd />
<nifi.zookeeper.security.truststore />
<nifi.zookeeper.security.truststoreType />
<nifi.zookeeper.security.truststorePasswd />
<nifi.zookeeper.auth.type />
<nifi.zookeeper.kerberos.removeHostFromPrincipal />
<nifi.zookeeper.kerberos.removeRealmFromPrincipal />

View File

@ -236,6 +236,13 @@ nifi.zookeeper.connect.string=${nifi.zookeeper.connect.string}
nifi.zookeeper.connect.timeout=${nifi.zookeeper.connect.timeout}
nifi.zookeeper.session.timeout=${nifi.zookeeper.session.timeout}
nifi.zookeeper.root.node=${nifi.zookeeper.root.node}
nifi.zookeeper.client.secure=${nifi.zookeeper.client.secure}
nifi.zookeeper.security.keystore=${nifi.zookeeper.security.keystore}
nifi.zookeeper.security.keystoreType=${nifi.zookeeper.security.keystoreType}
nifi.zookeeper.security.keystorePasswd=${nifi.zookeeper.security.keystorePasswd}
nifi.zookeeper.security.truststore=${nifi.zookeeper.security.truststore}
nifi.zookeeper.security.truststoreType=${nifi.zookeeper.security.truststoreType}
nifi.zookeeper.security.truststorePasswd=${nifi.zookeeper.security.truststorePasswd}
# Zookeeper properties for the authentication scheme used when creating acls on znodes used for cluster management
# Values supported for nifi.zookeeper.auth.type are "default", which will apply world/anyone rights on znodes