NIFI-7356 - Config TLS for embedded ZooKeeper when NiFi TLS enabled.

NIFI-7356 - Addresses PR feedback.

NIFI-7356 - Additional changes from PR feedback.

NIFI-7356 - Adding integration tests for ZooKeeperStateServer for TLS.

NIFI-7356 - TLS + Zookeeper now working with single and quorum. Needs code cleanup, need to fix IT tests and docs.

NIFI-7356 - Fixed up tests and removed some irrelevant ones. Refactored some of ZooKeeperStateServer. Tested successfully with a secure and insecure 3 node NiFi + Quorum.

NIFI-7356 - Checkstyle fixes.

NIFI-7356 - Updated administration guide with embedded ZooKeeper TLS configuration.

NIFI-7356 - Updated the way ZooKeeper TLS properties are set/mapped from NiFi properties.

NIFI-7356 - Updated per review, using NiFiProperties keystore strings, classname for ocnnection factory, adjusted TLS configuration checks in NiFiProperties.

NIFI-7356 - Updated configuration validation logic and added tests.

NIFI-7356 - Codestyle check fixes.

NIFI-7356 - Updated some of the log messages.

NIFI-7356 - Updated as per code review.

NIFI-7356 - Fixed max port number.

NIFI-7356 - Updated admin guide and updated small code issues as per code review.

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes #4753.
This commit is contained in:
Troy Melhase 2020-04-16 20:05:23 -08:00 committed by Nathan Gough
parent 6741317cc4
commit 76648bdc0b
19 changed files with 1239 additions and 30 deletions

View File

@ -1540,6 +1540,13 @@ public abstract class NiFiProperties {
&& getProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_PASSWD) != null;
}
public boolean isTlsConfigurationPresent() {
return StringUtils.isNotBlank(getProperty(NiFiProperties.SECURITY_KEYSTORE))
&& getProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD) != null
&& StringUtils.isNotBlank(getProperty(NiFiProperties.SECURITY_TRUSTSTORE))
&& getProperty(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD) != null;
}
public int size() {
return getPropertyKeys().size();
}

View File

@ -16,12 +16,8 @@
*/
package org.apache.nifi.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.net.InetSocketAddress;
@ -32,8 +28,13 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class NiFiPropertiesTest {
@ -327,4 +328,50 @@ public class NiFiPropertiesTest {
assertTrue(properties.isZooKeeperClientSecure());
assertTrue(properties.isZooKeeperTlsConfigurationPresent());
}
@Test
public void testKeystorePasswordIsMissing() {
NiFiProperties properties = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
put(NiFiProperties.SECURITY_KEYSTORE, "/a/keystore/filepath/keystore.jks");
put(NiFiProperties.SECURITY_KEYSTORE_TYPE, "JKS");
put(NiFiProperties.SECURITY_TRUSTSTORE, "/a/truststore/filepath/truststore.jks");
put(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD, "");
put(NiFiProperties.SECURITY_TRUSTSTORE_TYPE, "JKS");
}});
assertFalse(properties.isTlsConfigurationPresent());
}
@Test
public void testTlsConfigurationIsPresentWithEmptyPasswords() {
NiFiProperties properties = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
put(NiFiProperties.SECURITY_KEYSTORE, "/a/keystore/filepath/keystore.jks");
put(NiFiProperties.SECURITY_KEYSTORE_PASSWD, "");
put(NiFiProperties.SECURITY_KEYSTORE_TYPE, "JKS");
put(NiFiProperties.SECURITY_TRUSTSTORE, "/a/truststore/filepath/truststore.jks");
put(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD, "");
put(NiFiProperties.SECURITY_TRUSTSTORE_TYPE, "JKS");
}});
assertTrue(properties.isTlsConfigurationPresent());
}
@Test
public void testTlsConfigurationIsNotPresentWithPropertiesMissing() {
NiFiProperties properties = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
put(NiFiProperties.SECURITY_KEYSTORE_PASSWD, "password");
put(NiFiProperties.SECURITY_KEYSTORE_TYPE, "JKS");
put(NiFiProperties.SECURITY_TRUSTSTORE, "/a/truststore/filepath/truststore.jks");
}});
assertFalse(properties.isTlsConfigurationPresent());
}
@Test
public void testTlsConfigurationIsNotPresentWithNoProperties() {
NiFiProperties properties = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
}});
assertFalse(properties.isTlsConfigurationPresent());
}
}

View File

@ -2246,6 +2246,55 @@ _true_. Once Netty is enabled, you should see log messages like the following in
2020-02-24 23:37:54,082 INFO [nioEventLoopGroup-3-1] o.apache.zookeeper.ClientCnxnSocketNetty SSL handler added for channel: [id: 0xa831f9c3]
2020-02-24 23:37:54,104 INFO [nioEventLoopGroup-3-1] o.apache.zookeeper.ClientCnxnSocketNetty channel is connected: [id: 0xa831f9c3, L:/172.17.0.4:56510 - R:8e38869cd1d1/172.17.0.3:2281]
=== Embedded ZooKeeper with TLS
A NiFi cluster can be deployed using a ZooKeeper instance(s) embedded in NiFi itself which all nodes can communicate with. As of NiFi 1.13.0, communication between nodes and this embedded ZooKeeper can now be secured with TLS. Versions of NiFi prior to 1.13 did not use secure client access with embedded ZooKeeper(s). The configuration for the client side of the connection will operate in the same way as an external ZooKeeper. That is, it will use the `+nifi.security.*+` properties from the nifi.properties file by default, unless you specifiy explicit ZooKeeper keystore/truststore properties with `+nifi.zookeeper.security.*+` as described above.
The server configuration will operate in the same way as an insecure embedded server, but with the `+secureClientPort+` set (typically port `+2281+`).
Example _$NIFI_HOME/conf/zookeeper.properties_ file:
[source]
secureClientPort=2281
initLimit=10
autopurge.purgeInterval=24
syncLimit=5
tickTime=2000
dataDir=./state/zookeeper
autopurge.snapRetainCount=30
server.1=nifi1.example.com:2888:3888
server.2=nifi2.example.com:2888:3888
server.3=nifi3.example.com:2888:3888
When used with a three node NiFi cluster, the above configuration file would establish a three node ZooKeeper quorum with each node listening on secure port 2281 for client connections with NiFi, 2888 for quorum communication and 3888 for leader election.
NOTE: When using a secure server, the secure embedded ZooKeeper server ignores any +clientPort+ or +clientPortAddress+ specified in _$NIFI_HOME/conf/zookeeper.properties_. I.e., if the NiFi-embedded ZooKeeper exposes a +secureClientPort+ it will not expose an insecure +clientPort+ regardless of configuration. This is a behavioral difference between the embedded server and an external ZooKeeper server and ensures the embedded ZooKeeper will either run securely, or insecurely, but not a mixture of both.
The following is an example of the relevant properties to set in _$NIFI_HOME/conf/nifi.properties_ to run and connect to this quorum:
[source]
--
nifi.security.keystore=./conf/keystore.jks
nifi.security.keystoreType=jks
nifi.security.keystorePasswd=password
nifi.security.keyPasswd=password
nifi.security.truststore=./conf/truststore.jks
nifi.security.truststoreType=jks
nifi.security.truststorePasswd=password
nifi.security.user.authorizer=managed-authorizer
nifi.zookeeper.connect.string=nifi1.example.com:2281,nifi2.example.com:2281,nifi3.example.com:2281
nifi.zookeeper.connect.timeout=10 secs
nifi.zookeeper.session.timeout=10 secs
nifi.zookeeper.root.node=/nifi
nifi.zookeeper.client.secure=true
nifi.state.management.embedded.zookeeper.start=true
nifi.state.management.embedded.zookeeper.properties=./conf/zookeeper.properties
nifi.state.management.configuration.file=./conf/state-management.xml
nifi.state.management.provider.cluster=zk-provider
--
[[zookeeper_migrator]]
=== ZooKeeper Migrator
You can use the `zk-migrator` tool to perform the following tasks:

View File

@ -274,6 +274,8 @@
<exclude>src/test/resources/old-swap-file.swap</exclude>
<exclude>src/test/resources/xxe_template.xml</exclude>
<exclude>src/test/resources/swap/444-old-swap-file.swap</exclude>
<exclude>src/test/resources/ZooKeeperStateServerConfigurationsTest/keystore.jks</exclude>
<exclude>src/test/resources/ZooKeeperStateServerConfigurationsTest/truststore.jks</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -16,18 +16,18 @@
*/
package org.apache.nifi.controller.cluster;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.zookeeper.common.PathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
public class ZooKeeperClientConfig {
public static final String NETTY_CLIENT_CNXN_SOCKET =
@ -217,7 +217,7 @@ public class ZooKeeperClientConfig {
* @param defaultPropertyName The backup property to get from NiFi properties if the preferred property is not present.
* @return Returns the property in order of preference.
*/
private static String getPreferredProperty(final NiFiProperties properties, final String preferredPropertyName, final String defaultPropertyName) {
public static String getPreferredProperty(final NiFiProperties properties, final String preferredPropertyName, final String defaultPropertyName) {
String retrievedProperty = properties.getProperty(preferredPropertyName);
if(StringUtils.isBlank(retrievedProperty)) {

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.state.server;
import org.apache.zookeeper.common.X509Util;
public class ZooKeeperQuorumX509Util extends X509Util {
@Override
protected String getConfigPrefix() {
return "ssl.quorum.";
}
@Override
protected boolean shouldVerifyClientHostname() {
return true;
}
}

View File

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.state.server;
import org.apache.zookeeper.common.X509Util;
public class ZooKeeperServerX509Util extends X509Util {
@Override
protected String getConfigPrefix() {
return "ssl.";
}
@Override
protected boolean shouldVerifyClientHostname() {
return true;
}
}

View File

@ -17,8 +17,11 @@
package org.apache.nifi.controller.state.server;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.nifi.util.NiFiProperties;
import org.apache.zookeeper.common.X509Util;
import org.apache.zookeeper.server.DatadirCleanupManager;
import org.apache.zookeeper.server.NettyServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZKDatabase;
@ -36,11 +39,17 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.util.Properties;
public class ZooKeeperStateServer extends ZooKeeperServerMain {
private static final Logger logger = LoggerFactory.getLogger(ZooKeeperStateServer.class);
private static final int MIN_PORT = 1024;
private static final int MAX_PORT = 65535;
private static final String ZOOKEEPER_SSL_QUORUM = "sslQuorum";
private static final String ZOOKEEPER_PORT_UNIFICATION = "portUnification";
private static final String ZOOKEEPER_SERVER_CNXN_FACTORY = "serverCnxnFactory";
private final QuorumPeerConfig quorumPeerConfig;
private volatile boolean started = false;
@ -50,9 +59,13 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain {
private QuorumPeer quorumPeer;
private DatadirCleanupManager datadirCleanupManager;
private ZooKeeperStateServer(final Properties zkProperties) throws IOException, ConfigException {
quorumPeerConfig = new QuorumPeerConfig();
quorumPeerConfig.parseProperties(zkProperties);
private ZooKeeperStateServer(final QuorumPeerConfig config) {
quorumPeerConfig = config;
}
// Expose the configuration for verification + testing:
final QuorumPeerConfig getQuorumPeerConfig() {
return quorumPeerConfig;
}
public synchronized void start() throws IOException {
@ -93,7 +106,7 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain {
try {
Thread.sleep(50L);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();;
Thread.currentThread().interrupt();
}
}
}
@ -105,7 +118,7 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain {
embeddedZkServer.setMaxSessionTimeout(config.getMaxSessionTimeout());
connectionFactory = ServerCnxnFactory.createFactory();
connectionFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
connectionFactory.configure(getAvailableSocketAddress(config), config.getMaxClientCnxns(), quorumPeerConfig.isSslQuorum());
connectionFactory.startup(embeddedZkServer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@ -122,11 +135,18 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain {
try {
transactionLog = new FileTxnSnapLog(quorumPeerConfig.getDataLogDir(), quorumPeerConfig.getDataDir());
connectionFactory = ServerCnxnFactory.createFactory();
connectionFactory.configure(quorumPeerConfig.getClientPortAddress(), quorumPeerConfig.getMaxClientCnxns());
connectionFactory.configure(getAvailableSocketAddress(quorumPeerConfig), quorumPeerConfig.getMaxClientCnxns(), quorumPeerConfig.isSslQuorum());
quorumPeer = new QuorumPeer();
// Set the secure connection factory if the quorum is supposed to be secure.
if (quorumPeerConfig.isSslQuorum()) {
quorumPeer.setSecureCnxnFactory(connectionFactory);
} else {
quorumPeer.setCnxnFactory(connectionFactory);
}
quorumPeer.setTxnFactory(new FileTxnSnapLog(quorumPeerConfig.getDataLogDir(), quorumPeerConfig.getDataDir()));
quorumPeer.setElectionType(quorumPeerConfig.getElectionAlg());
quorumPeer.setMyid(quorumPeerConfig.getServerId());
@ -136,11 +156,11 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain {
quorumPeer.setInitLimit(quorumPeerConfig.getInitLimit());
quorumPeer.setSyncLimit(quorumPeerConfig.getSyncLimit());
quorumPeer.setQuorumVerifier(quorumPeerConfig.getQuorumVerifier(), false);
quorumPeer.setCnxnFactory(connectionFactory);
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setLearnerType(quorumPeerConfig.getPeerType());
quorumPeer.setSyncEnabled(quorumPeerConfig.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(quorumPeerConfig.getQuorumListenOnAllIPs());
quorumPeer.setSslQuorum(quorumPeerConfig.isSslQuorum());
quorumPeer.start();
} catch (final IOException ioe) {
@ -198,6 +218,142 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain {
zkProperties.load(bis);
}
return new ZooKeeperStateServer(zkProperties);
return new ZooKeeperStateServer(reconcileProperties(properties, zkProperties));
}
/**
* Reconcile properties between the nifi.properties and zookeeper.properties (zoo.cfg) files. Most of the ZooKeeper server properties are derived from
* the zookeeper.properties file, while the TLS key/truststore properties are taken from nifi.properties.
* @param niFiProperties NiFiProperties file containing ZooKeeper client and TLS configuration
* @param zkProperties The zookeeper.properties file containing ZooKeeper server configuration
* @return A reconciled QuorumPeerConfig which will include TLS properties set if they are available.
* @throws IOException If configuration files fail to parse.
* @throws ConfigException If secure configuration is not as expected. Check administration documentation.
*/
private static QuorumPeerConfig reconcileProperties(NiFiProperties niFiProperties, Properties zkProperties) throws IOException, ConfigException {
QuorumPeerConfig peerConfig = new QuorumPeerConfig();
peerConfig.parseProperties(zkProperties);
final boolean niFiConfigIsSecure = isNiFiConfigSecureForZooKeeper(niFiProperties);
final boolean zooKeeperConfigIsSecure = isZooKeeperConfigSecure(peerConfig);
if (!zooKeeperConfigIsSecure && !niFiConfigIsSecure) {
logger.debug("{} property is set to false or is not present, and zookeeper.properties file does not contain secureClientPort property, so embedded ZooKeeper will be started without TLS",
NiFiProperties.ZOOKEEPER_CLIENT_SECURE);
return peerConfig;
}
// If secureClientPort is set but no TLS config is set, fail to start.
if (zooKeeperConfigIsSecure && !niFiConfigIsSecure) {
throw new ConfigException(
String.format("ZooKeeper properties file %s was configured to be secure but there was no valid TLS config present in nifi.properties or " +
"nifi.zookeeper.client.secure was set to false. Check the administration guide",
niFiProperties.getProperty(NiFiProperties.STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES)));
}
// Remove any insecure ports if they were set in zookeeper.properties
ensureOnlySecurePortsAreEnabled(peerConfig, zkProperties);
// Set base ZooKeeper TLS server properties
setTlsProperties(zkProperties, new ZooKeeperServerX509Util(), niFiProperties);
// Set quorum ZooKeeper TLS server properties
setTlsProperties(zkProperties, new ZooKeeperQuorumX509Util(), niFiProperties);
// Set TLS client port:
zkProperties.setProperty("secureClientPort", getSecurePort(peerConfig));
// Set the required connection factory for TLS
zkProperties.setProperty(ZOOKEEPER_SERVER_CNXN_FACTORY, NettyServerCnxnFactory.class.getName());
zkProperties.setProperty(ZOOKEEPER_SSL_QUORUM, Boolean.TRUE.toString());
// Port unification allows both secure and insecure connections - setting to false means only secure connections will be allowed.
zkProperties.setProperty(ZOOKEEPER_PORT_UNIFICATION, Boolean.FALSE.toString());
// Recreate and reload the adjusted properties to ensure they're still valid for ZK:
peerConfig = new QuorumPeerConfig();
peerConfig.parseProperties(zkProperties);
return peerConfig;
}
private static boolean isZooKeeperConfigSecure(QuorumPeerConfig peerConfig) throws ConfigException {
InetSocketAddress secureAddress = peerConfig.getSecureClientPortAddress();
InetSocketAddress insecureAddress = peerConfig.getClientPortAddress();
if(secureAddress == null && insecureAddress == null) {
throw new ConfigException("No clientAddress or secureClientAddress is set in zookeeper.properties");
}
return secureAddress != null;
}
/**
* Verify whether the NiFi properties portion of ZooKeeper are correctly configured for TLS or not
* @param niFiProperties The loaded nifi.properties
* @return True if NiFi has TLS configuration and the property nifi.zookeeper.client.secure=true, otherwise false or configuration exception
* @throws ConfigException If nifi.zookeeper.client.secure=true but no TLS configuration is present
*/
private static boolean isNiFiConfigSecureForZooKeeper(NiFiProperties niFiProperties) throws ConfigException {
final boolean isTlsConfigPresent = niFiProperties.isZooKeeperTlsConfigurationPresent() || niFiProperties.isTlsConfigurationPresent();
final boolean isZooKeeperClientSecure = niFiProperties.isZooKeeperClientSecure();
if(isZooKeeperClientSecure && !isTlsConfigPresent) {
throw new ConfigException(String.format("%s is true but no TLS configuration is present in nifi.properties", NiFiProperties.ZOOKEEPER_CLIENT_SECURE));
}
return (isZooKeeperClientSecure && isTlsConfigPresent);
}
private static void ensureOnlySecurePortsAreEnabled(QuorumPeerConfig config, Properties zkProperties) {
// Remove plaintext client ports and addresses and warn if set, see NIFI-7203:
InetSocketAddress clientPort = config.getClientPortAddress();
InetSocketAddress secureClientPort = config.getSecureClientPortAddress();
if (clientPort != null && secureClientPort != null) {
zkProperties.remove("clientPort");
zkProperties.remove("clientPortAddress");
logger.warn("Invalid configuration was detected: A secure NiFi with an embedded ZooKeeper was configured for insecure connections. " +
"Insecure ports have been removed from embedded ZooKeeper configuration to deactivate insecure connections");
}
}
private static void setTlsProperties(Properties zooKeeperProperties, X509Util zooKeeperUtil, NiFiProperties niFiProperties) {
zooKeeperProperties.setProperty(zooKeeperUtil.getSslKeystoreLocationProperty(),
ZooKeeperClientConfig.getPreferredProperty(niFiProperties, NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE, NiFiProperties.SECURITY_KEYSTORE));
zooKeeperProperties.setProperty(zooKeeperUtil.getSslKeystorePasswdProperty(),
ZooKeeperClientConfig.getPreferredProperty(niFiProperties, NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_PASSWD, NiFiProperties.SECURITY_KEYSTORE_PASSWD));
zooKeeperProperties.setProperty(zooKeeperUtil.getSslKeystoreTypeProperty(),
ZooKeeperClientConfig.getPreferredProperty(niFiProperties, NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, NiFiProperties.SECURITY_KEYSTORE_TYPE));
zooKeeperProperties.setProperty(zooKeeperUtil.getSslTruststoreLocationProperty(),
ZooKeeperClientConfig.getPreferredProperty(niFiProperties, NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE, NiFiProperties.SECURITY_TRUSTSTORE));
zooKeeperProperties.setProperty(zooKeeperUtil.getSslTruststorePasswdProperty(),
ZooKeeperClientConfig.getPreferredProperty(niFiProperties, NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_PASSWD, NiFiProperties.SECURITY_TRUSTSTORE_PASSWD));
zooKeeperProperties.setProperty(zooKeeperUtil.getSslTruststoreTypeProperty(),
ZooKeeperClientConfig.getPreferredProperty(niFiProperties, NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, NiFiProperties.SECURITY_TRUSTSTORE_TYPE));
}
private static String getSecurePort(QuorumPeerConfig peerConfig) throws ConfigException {
final InetSocketAddress secureClientAddress = peerConfig.getSecureClientPortAddress();
String secureClientPort = null;
if (secureClientAddress != null && secureClientAddress.getPort() >= MIN_PORT && secureClientAddress.getPort() <= MAX_PORT) {
secureClientPort = String.valueOf(secureClientAddress.getPort());
if (logger.isDebugEnabled()) {
logger.debug("Secure client port retrieved from ZooKeeper configuration: {}", secureClientPort);
}
return secureClientPort;
} else {
throw new ConfigException(String.format("NiFi was configured to be secure but secureClientPort could not be retrieved from zookeeper.properties file or it was not " +
"in valid port range %d - %d", MIN_PORT, MAX_PORT));
}
}
private static InetSocketAddress getAvailableSocketAddress(ServerConfig config) {
return config.getSecureClientPortAddress() != null ? config.getSecureClientPortAddress() : config.getClientPortAddress();
}
private static InetSocketAddress getAvailableSocketAddress(QuorumPeerConfig quorumConfig) {
final ServerConfig serverConfig = new ServerConfig();
serverConfig.readFrom(quorumConfig);
return getAvailableSocketAddress(serverConfig);
}
}

View File

@ -49,7 +49,7 @@ import java.util.LinkedHashMap;
import java.util.Map;
import static org.apache.nifi.leader.election.ITSecureClientZooKeeperFactory.createAndStartServer;
import static org.apache.nifi.leader.election.ITSecureClientZooKeeperFactory.createClientProperties;
import static org.apache.nifi.leader.election.ITSecureClientZooKeeperFactory.createSecureClientProperties;
public class ITZooKeeperStateProvider extends AbstractTestStateProvider {
@ -94,7 +94,7 @@ public class ITZooKeeperStateProvider extends AbstractTestStateProvider {
zkServer = serverConnectionFactory.getZooKeeperServer();
// Set up state provider (client) TLS properties, normally injected through StateProviderContext annotation
nifiProperties = createClientProperties(
nifiProperties = createSecureClientProperties(
clientPort,
Paths.get(CLIENT_KEYSTORE),
CLIENT_KEYSTORE_TYPE,

View File

@ -0,0 +1,561 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.state.server;
import org.apache.commons.io.FileUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.utils.DefaultZookeeperFactory;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.nifi.controller.cluster.SecureClientZooKeeperFactory;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.nifi.util.NiFiProperties;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.apache.nifi.leader.election.ITSecureClientZooKeeperFactory.createSecureClientProperties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
// Testing setting up a ZooKeeperStateServer with TLS
public class ITZooKeeperStateServerTLS {
private static final String KEY_STORE = getPath("keystore.jks");
private static final String TRUST_STORE = getPath("truststore.jks");
private static final String STORE_TYPE = "JKS";
private static final String INSECURE_ZOOKEEPER_PROPS = getPath("insecure.zookeeper.properties");
private static final String PARTIAL_ZOOKEEPER_PROPS = getPath("partial.zookeeper.properties");
private static final String SECURE_ZOOKEEPER_PROPS = getPath("secure.zookeeper.properties");
private static final String ZOOKEEPER_PROPERTIES_FILE_KEY = "nifi.state.management.embedded.zookeeper.properties";
private static final String ZOOKEEPER_CNXN_FACTORY = "org.apache.zookeeper.server.NettyServerCnxnFactory";
private static final String QUORUM_CONNECT_STRING = "node0.apache.org:2281,node1.apache.org:2281";
@Rule
public ExpectedException expectedException = ExpectedException.none();
private static final Map<String, String> INSECURE_NIFI_PROPS = new HashMap<String, String>() {{
put(ZOOKEEPER_PROPERTIES_FILE_KEY, INSECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.WEB_HTTP_HOST, "localhost");
put(NiFiProperties.WEB_HTTP_PORT, "8080");
put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "false");
}};
private static final String TEST_PASSWORD = "passwordpassword";
private static final Map<String, String> SECURE_NIFI_PROPS = new HashMap<String, String>() {{
put(NiFiProperties.STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES, SECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.WEB_HTTPS_PORT, "8443");
put(NiFiProperties.SECURITY_KEYSTORE, KEY_STORE);
put(NiFiProperties.SECURITY_KEYSTORE_TYPE, STORE_TYPE);
put(NiFiProperties.SECURITY_KEYSTORE_PASSWD, TEST_PASSWORD);
put(NiFiProperties.SECURITY_TRUSTSTORE, TRUST_STORE);
put(NiFiProperties.SECURITY_TRUSTSTORE_TYPE, STORE_TYPE);
put(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD, TEST_PASSWORD);
put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "true");
}};
private static final Map<String, String> SECURE_ZOOKEEPER_NIFI_PROPS = new HashMap<String, String>() {{
put(NiFiProperties.STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES, SECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.WEB_HTTPS_PORT, "8443");
put(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE, KEY_STORE);
put(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, STORE_TYPE);
put(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_PASSWD, TEST_PASSWORD);
put(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE, TRUST_STORE);
put(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, STORE_TYPE);
put(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_PASSWD, TEST_PASSWORD);
put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "true");
}};
private NiFiProperties niFiProps;
private static NiFiProperties clientProperties;
private QuorumPeerConfig quorumPeerConfig;
private Properties secureZooKeeperProps;
private Properties insecureZooKeeperProps;
private Properties partialZooKeeperProps;
private ZooKeeperStateServer server;
@Before
public void setupWithValidProperties() throws IOException, QuorumPeerConfig.ConfigException {
niFiProps = NiFiProperties.createBasicNiFiProperties(null, SECURE_NIFI_PROPS);
assertNotNull(niFiProps);
// This shows that a ZooKeeper server is created from valid NiFi properties:
final ZooKeeperStateServer zooKeeperStateServer = ZooKeeperStateServer.create(niFiProps);
assertNotNull(zooKeeperStateServer);
quorumPeerConfig = zooKeeperStateServer.getQuorumPeerConfig();
assertNotNull(quorumPeerConfig);
secureZooKeeperProps = new Properties();
secureZooKeeperProps.load(FileUtils.openInputStream(new File(SECURE_ZOOKEEPER_PROPS)));
assertNotNull(secureZooKeeperProps);
insecureZooKeeperProps = new Properties();
insecureZooKeeperProps.load(FileUtils.openInputStream(new File(INSECURE_ZOOKEEPER_PROPS)));
assertNotNull(insecureZooKeeperProps);
partialZooKeeperProps = new Properties();
partialZooKeeperProps.load(FileUtils.openInputStream(new File(PARTIAL_ZOOKEEPER_PROPS)));
assertNotNull(partialZooKeeperProps);
}
@After
public void clearConnectionProperties() {
Collections.unmodifiableSet(System.getProperties().stringPropertyNames()).stream()
.filter(name -> name.startsWith("zookeeper."))
.forEach(System::clearProperty);
}
// This test shows that a ZooKeeperStateServer cannot be created from empty NiFi properties.
@Test
public void testCreateFromEmptyNiFiProperties() throws IOException, QuorumPeerConfig.ConfigException {
final NiFiProperties emptyProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<>());
assertNotNull(emptyProps);
Assert.assertNull(ZooKeeperStateServer.create(emptyProps));
}
// This test shows that a ZooKeeperStateServer can be created from insecure NiFi properties.
@Test
public void testCreateFromValidInsecureNiFiProperties() throws IOException, QuorumPeerConfig.ConfigException {
final NiFiProperties insecureProps = NiFiProperties.createBasicNiFiProperties(null, INSECURE_NIFI_PROPS);
final ZooKeeperStateServer server = ZooKeeperStateServer.create(insecureProps);
assertNotNull(server);
assertNotNull(server.getQuorumPeerConfig().getClientPortAddress());
}
// This test shows that a ZK TLS config with some but not all values throws an exception:
@Test
public void testCreateFromPartialZooKeeperTlsProperties() {
final NiFiProperties partialProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, PARTIAL_ZOOKEEPER_PROPS);
}});
Assert.assertThrows(QuorumPeerConfig.ConfigException.class, () ->
ZooKeeperStateServer.create(partialProps));
}
// This test shows that a ZK TLS config with all values set is valid and a server can be created using it:
@Test
public void testCreateFromCompleteZooKeeperTlsProperties() throws IOException, QuorumPeerConfig.ConfigException {
final NiFiProperties completeProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
}});
assertNotNull(ZooKeeperStateServer.create(completeProps));
}
// This test shows that the client can specify a secure port and that port is used:
@Test
public void testCreateWithSpecifiedSecureClientPort() throws IOException, QuorumPeerConfig.ConfigException {
final NiFiProperties secureProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
}});
final ZooKeeperStateServer server = ZooKeeperStateServer.create(secureProps);
assertNotNull(server);
final QuorumPeerConfig config = server.getQuorumPeerConfig();
assertEquals(secureZooKeeperProps.getProperty("secureClientPort"), String.valueOf(config.getSecureClientPortAddress().getPort()));
}
// This shows that a secure NiFi with an insecure ZooKeeper will not have an insecure client address or port:
@Test
public void testCreateRemovesInsecureClientPort() {
assertNotNull(insecureZooKeeperProps.getProperty("clientPort"));
Assert.assertNotEquals(insecureZooKeeperProps.getProperty("clientPort"), "");
Assert.assertNull(quorumPeerConfig.getClientPortAddress());
}
// This test shows that a connection class is set when none is specified (QuorumPeerConfig::parseProperties sets the System property):
@Test
public void testCreateWithUnspecifiedConnectionClass() {
assertEquals(org.apache.zookeeper.server.NettyServerCnxnFactory.class.getName(), System.getProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY));
}
// This test shows that a specified connection class is honored (QuorumPeerConfig::parseProperties sets the System property):
@Test
public void testCreateWithSpecifiedConnectionClass() throws IOException, QuorumPeerConfig.ConfigException {
final NiFiProperties secureProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
}});
assertNotNull(ZooKeeperStateServer.create(secureProps));
assertEquals(ZOOKEEPER_CNXN_FACTORY, System.getProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY));
}
@After
public void tearDown() throws Exception {
if(server != null) {
File stateDir = server.getQuorumPeerConfig().getDataDir().getParentFile();
deleteRecursively(stateDir);
server.shutdown();
server = null;
}
}
private void deleteRecursively(final File file) {
final File[] children = file.listFiles();
if (children != null) {
for (final File child : children) {
deleteRecursively(child);
}
}
file.delete();
}
// Connect to a secure ZooKeeperStateServer
@Test
public void testSecureClientQuorumConnectString() throws Exception {
final int actualPort = Integer.parseInt(secureZooKeeperProps.getProperty("secureClientPort", "0"));
final String connect = "localhost:" + actualPort;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, QUORUM_CONNECT_STRING);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
assertNotNull(server);
final int serverPort = server.getQuorumPeerConfig().getSecureClientPortAddress().getPort();
assertEquals(actualPort, serverPort);
server.start();
// Set up a ZK client
CuratorFramework client = getSecureZooKeeperClient(serverPort);
client.start();
final String testPath = "/test";
final String createResult = client.create().forPath(testPath, new byte[0]);
final Stat checkExistsResult = client.checkExists().forPath(testPath);
assertEquals(createResult, testPath);
assertNotNull(checkExistsResult);
}
// Connect to an insecure ZooKeeperStateServer with an insecure client (ensure insecure setup still works)
@Test
public void testInsecureZooKeeperWithInsecureClient() throws Exception {
final int actualPort = Integer.parseInt(insecureZooKeeperProps.getProperty("clientPort", "0"));
final String connect = "localhost:" + 2381;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(INSECURE_NIFI_PROPS);
put(NiFiProperties.STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES, INSECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
assertNotNull(server);
final int serverPort = server.getQuorumPeerConfig().getClientPortAddress().getPort();
//assertEquals(actualPort, 2381);
server.start();
// Set up a ZK client
CuratorFramework client = getInsecureZooKeeperClient(INSECURE_NIFI_PROPS, connect);
client.start();
final String testPath = "/test";
final String createResult = client.create().forPath(testPath, new byte[0]);
final Stat checkExistsResult = client.checkExists().forPath(testPath);
assertEquals(createResult, testPath);
assertNotNull(checkExistsResult);
}
// Fail to connect to a secure ZooKeeperStateServer with insecure client configuration
@Test(expected = KeeperException.ConnectionLossException.class)
public void testSecureZooKeeperStateServerWithInsecureClient() throws Exception {
final int actualPort = Integer.parseInt(secureZooKeeperProps.getProperty("secureClientPort", "0"));
final String connect = "localhost:" + actualPort;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
assertNotNull(server);
final int serverPort = server.getQuorumPeerConfig().getSecureClientPortAddress().getPort();
assertEquals(actualPort, serverPort);
server.start();
// Set up a ZK client
CuratorFramework client = getInsecureZooKeeperClient(INSECURE_NIFI_PROPS, connect);
client.start();
final String testPath = "/test";
// Expect this to fail with ConnectionLossException
final String createResult = client.create().forPath(testPath, new byte[0]);
}
// Fail to connect to a secure ZooKeeperStateServer with missing client configuration
@Test(expected = KeeperException.ConnectionLossException.class)
public void testSecureZooKeeperStateServerWithMissingClientConfiguration() throws Exception {
final int actualPort = Integer.parseInt(secureZooKeeperProps.getProperty("secureClientPort", "0"));
final String connect = "localhost:" + actualPort;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
assertNotNull(server);
final int serverPort = server.getQuorumPeerConfig().getSecureClientPortAddress().getPort();
assertEquals(actualPort, serverPort);
server.start();
// Set up a ZK client
CuratorFramework client = getInsecureZooKeeperClient(INSECURE_NIFI_PROPS, connect);
client.start();
final String testPath = "/test";
// Expect this to fail with ConnectionLossException
final String createResult = client.create().forPath(testPath, new byte[0]);
}
// Connect to a secure ZooKeeperStateServer
@Test
public void testSecureClientConnection() throws Exception {
final int actualPort = Integer.parseInt(secureZooKeeperProps.getProperty("secureClientPort", "0"));
final String connect = "localhost:" + actualPort;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
assertNotNull(server);
final int serverPort = server.getQuorumPeerConfig().getSecureClientPortAddress().getPort();
assertEquals(actualPort, serverPort);
server.start();
// Set up a ZK client
CuratorFramework client = getSecureZooKeeperClient(serverPort);
client.start();
final String testPath = "/test";
final String createResult = client.create().forPath(testPath, new byte[0]);
final Stat checkExistsResult = client.checkExists().forPath(testPath);
assertEquals(createResult, testPath);
assertNotNull(checkExistsResult);
}
// Connect to an insecure ZooKeeperStateServer
@Test
public void testClientSecureFalseClientPortNoTls() throws Exception {
final int actualPort = Integer.parseInt(insecureZooKeeperProps.getProperty("clientPort", "3000"));
final String connect = "localhost:" + actualPort;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(INSECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, INSECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
assertNotNull(server);
final int serverPort = server.getQuorumPeerConfig().getClientPortAddress().getPort();
assertEquals(actualPort, serverPort);
server.start();
// Set up a ZK client
CuratorFramework client = getInsecureZooKeeperClient(INSECURE_NIFI_PROPS, connect);
client.start();
final String testPath = "/test";
final String createResult = client.create().forPath(testPath, new byte[0]);
final Stat checkExistsResult = client.checkExists().forPath(testPath);
assertEquals(createResult, testPath);
assertNotNull(checkExistsResult);
}
@Test
public void testClientSecureFalseWithClientSecurePortAndNoTls() throws Exception {
expectedException.expect(QuorumPeerConfig.ConfigException.class);
expectedException.expectMessage("ZooKeeper properties file src/test/resources/TestZooKeeperStateServerConfigurations/secure.zookeeper.properties was " +
"configured to be secure but there was no valid TLS config present in nifi.properties or nifi.zookeeper.client.secure was set to false. Check the administration guide");
final int actualPort = Integer.parseInt(secureZooKeeperProps.getProperty("secureClientPort"));
final String connect = "localhost:" + actualPort;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(INSECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
}
@Test
public void testClientSecureTrueWithInsecureZooKeeperAndTlsSet() throws Exception {
expectedException.expect(QuorumPeerConfig.ConfigException.class);
expectedException.expectMessage("NiFi was configured to be secure but secureClientPort could not be retrieved from zookeeper.properties file");
final int actualPort = Integer.parseInt(insecureZooKeeperProps.getProperty("clientPort"));
final String connect = "localhost:" + actualPort;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "true");
put(ZOOKEEPER_PROPERTIES_FILE_KEY, INSECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
}
@Test
public void testClientSecureTrueWithNoTls() throws Exception {
expectedException.expect(QuorumPeerConfig.ConfigException.class);
expectedException.expectMessage(NiFiProperties.ZOOKEEPER_CLIENT_SECURE + " is true but no TLS configuration is present in nifi.properties");
final int actualPort = Integer.parseInt(insecureZooKeeperProps.getProperty("clientPort"));
final String connect = "localhost:" + actualPort;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(INSECURE_NIFI_PROPS);
put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "true");
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
}
@Test
public void testZooKeeperMissingPortSettings() throws Exception {
expectedException.expect(QuorumPeerConfig.ConfigException.class);
expectedException.expectMessage("No clientAddress or secureClientAddress is set in zookeeper.properties");
final String connect = "localhost:" + 2181;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "true");
put(ZOOKEEPER_PROPERTIES_FILE_KEY, PARTIAL_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
}
@Test
public void testClientSecureFalseAndOnlyZooKeeperClientPortSetWithTlsProperties() throws Exception {
final String connect = "localhost:" + 2181;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "false");
put(ZOOKEEPER_PROPERTIES_FILE_KEY, INSECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
}
// Connect to a secure ZooKeeperStateServer with ZooKeeper.
@Test
public void testSecureClientConnectionWithZooKeeperSecurityProperties() throws Exception {
final int actualPort = Integer.parseInt(secureZooKeeperProps.getProperty("secureClientPort", "0"));
final String connect = "localhost:" + actualPort;
final NiFiProperties validZkClientProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_ZOOKEEPER_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connect);
}});
server = ZooKeeperStateServer.create(validZkClientProps);
assertNotNull(server);
final int serverPort = server.getQuorumPeerConfig().getSecureClientPortAddress().getPort();
assertEquals(actualPort, serverPort);
server.start();
// Set up a ZK client
CuratorFramework client = getSecureZooKeeperClient(serverPort);
client.start();
final String testPath = "/test";
final String createResult = client.create().forPath(testPath, new byte[0]);
final Stat checkExistsResult = client.checkExists().forPath(testPath);
assertEquals(createResult, testPath);
assertNotNull(checkExistsResult);
}
private static String getPath(String path) {
return new File("src/test/resources/TestZooKeeperStateServerConfigurations/" + path).getPath();
}
private CuratorFramework getSecureZooKeeperClient(final int port) {
// TODO: port being set needs to be based on port set in nifi.properties, should create client in the same
clientProperties = createSecureClientProperties(
port,
Paths.get(KEY_STORE),
STORE_TYPE,
TEST_PASSWORD,
Paths.get(TRUST_STORE),
STORE_TYPE,
TEST_PASSWORD
);
final ZooKeeperClientConfig zkClientConfig =
ZooKeeperClientConfig.createConfig(clientProperties);
return getCuratorFramework(zkClientConfig, new SecureClientZooKeeperFactory(zkClientConfig));
}
private CuratorFramework getInsecureZooKeeperClient(final Map<String, String> nifiClientProps, final String connectString) {
// set up zkClientConfig
Map<String, String> supplementedProps = nifiClientProps;
supplementedProps.put(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connectString);
NiFiProperties insecureClientProperties = NiFiProperties.createBasicNiFiProperties(null, supplementedProps);
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(insecureClientProperties);
ZookeeperFactory factory = null;
try {
factory = new DefaultZookeeperFactory();
} catch (Exception e) {
e.printStackTrace();
}
return getCuratorFramework(zkClientConfig, factory);
}
private CuratorFramework getCuratorFramework(ZooKeeperClientConfig zkClientConfig, ZookeeperFactory factory) {
final CuratorFrameworkFactory.Builder clientBuilder = CuratorFrameworkFactory.builder()
.connectString(zkClientConfig.getConnectString())
.sessionTimeoutMs(zkClientConfig.getSessionTimeoutMillis())
.connectionTimeoutMs(zkClientConfig.getConnectionTimeoutMillis())
.retryPolicy(new RetryOneTime(200))
.defaultData(new byte[0])
.zookeeperFactory(factory);
return clientBuilder.build();
}
}

View File

@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.state.server;
import org.apache.commons.io.FileUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
// This class tests the behaviors involved with the ZooKeeperStateServer::create method. The servers are not started,
// and TLS connections are not used.
public class TestZooKeeperStateServerConfigurations {
private static final String KEY_STORE = getPath("keystore.jks");
private static final String TRUST_STORE = getPath("truststore.jks");
private static final String INSECURE_ZOOKEEPER_PROPS = getPath("insecure.zookeeper.properties");
private static final String SECURE_ZOOKEEPER_PROPS = getPath("secure.zookeeper.properties");
private static final String ZOOKEEPER_PROPERTIES_FILE_KEY = "nifi.state.management.embedded.zookeeper.properties";
private static final String ZOOKEEPER_CNXN_FACTORY = "org.apache.zookeeper.server.NettyServerCnxnFactory";
private static final String KEYSTORE_PASSWORD = "passwordpassword";
private static final String TRUSTSTORE_PASSWORD = "passwordpassword";
private static final String STORE_TYPE = "JKS";
private static final Map<String, String> INSECURE_PROPS = new HashMap<String, String>() {{
put(ZOOKEEPER_PROPERTIES_FILE_KEY, INSECURE_ZOOKEEPER_PROPS);
}};
private static final Map<String, String> INSECURE_NIFI_PROPS = new HashMap<String, String>() {{
putAll(INSECURE_PROPS);
put(NiFiProperties.WEB_HTTP_PORT, "8080");
put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "false");
}};
private static final Map<String, String> SECURE_NIFI_PROPS = new HashMap<String, String>() {{
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
put(NiFiProperties.WEB_HTTPS_PORT, "8443");
put(NiFiProperties.SECURITY_KEYSTORE, KEY_STORE);
put(NiFiProperties.SECURITY_KEYSTORE_PASSWD, KEYSTORE_PASSWORD);
put(NiFiProperties.SECURITY_KEYSTORE_TYPE, STORE_TYPE);
put(NiFiProperties.SECURITY_TRUSTSTORE, TRUST_STORE);
put(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD, TRUSTSTORE_PASSWORD);
put(NiFiProperties.SECURITY_TRUSTSTORE_TYPE, STORE_TYPE);
put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "true");
}};
private NiFiProperties secureNiFiProps;
private NiFiProperties insecureNiFiProps;
private QuorumPeerConfig secureQuorumPeerConfig;
private QuorumPeerConfig insecureQuorumPeerConfig;
private Properties secureZooKeeperProps;
private Properties insecureZooKeeperProps;
@Before
public void setupWithValidProperties() throws IOException, QuorumPeerConfig.ConfigException {
// Secure properties setup
secureNiFiProps = NiFiProperties.createBasicNiFiProperties(null, SECURE_NIFI_PROPS);
Assert.assertNotNull(secureNiFiProps);
// This shows that a ZooKeeper server is created from valid NiFi properties:
final ZooKeeperStateServer secureZooKeeperStateServer = ZooKeeperStateServer.create(secureNiFiProps);
Assert.assertNotNull(secureZooKeeperStateServer);
secureQuorumPeerConfig = secureZooKeeperStateServer.getQuorumPeerConfig();
Assert.assertNotNull(secureQuorumPeerConfig);
secureZooKeeperProps = new Properties();
secureZooKeeperProps.load( FileUtils.openInputStream(new File(SECURE_ZOOKEEPER_PROPS)));
Assert.assertNotNull(secureZooKeeperProps);
// Insecure properties setup
insecureNiFiProps = NiFiProperties.createBasicNiFiProperties(null, INSECURE_NIFI_PROPS);
Assert.assertNotNull(insecureNiFiProps);
// This shows that a ZooKeeper server is created from valid NiFi properties:
final ZooKeeperStateServer insecureZooKeeperStateServer = ZooKeeperStateServer.create(insecureNiFiProps);
Assert.assertNotNull(insecureZooKeeperStateServer);
insecureQuorumPeerConfig = insecureZooKeeperStateServer.getQuorumPeerConfig();
Assert.assertNotNull(insecureQuorumPeerConfig);
insecureZooKeeperProps = new Properties();
insecureZooKeeperProps.load(FileUtils.openInputStream(new File(INSECURE_ZOOKEEPER_PROPS)));
Assert.assertNotNull(insecureZooKeeperProps);
}
@After
public void clearConnectionProperties() {
Collections.unmodifiableSet(System.getProperties().stringPropertyNames()).stream()
.filter(name -> name.startsWith("zookeeper."))
.forEach(System::clearProperty);
}
// This test shows that a ZooKeeperStateServer cannot be created from empty NiFi properties.
@Test
public void testCreateFromEmptyNiFiProperties() throws IOException, QuorumPeerConfig.ConfigException {
final NiFiProperties emptyProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<>());
Assert.assertNotNull(emptyProps);
Assert.assertNull(ZooKeeperStateServer.create(emptyProps));
}
// This test shows that a ZooKeeperStateServer can be created from insecure NiFi properties.
@Test
public void testCreateFromValidInsecureNiFiProperties() throws IOException, QuorumPeerConfig.ConfigException {
final NiFiProperties insecureProps = NiFiProperties.createBasicNiFiProperties(null, INSECURE_PROPS);
final ZooKeeperStateServer server = ZooKeeperStateServer.create(insecureProps);
Assert.assertNotNull(server);
Assert.assertNotNull(server.getQuorumPeerConfig().getClientPortAddress());
}
// This test shows that the client can specify a secure port and that port is used:
@Test
public void testCreateWithSpecifiedSecureClientPort() throws IOException, QuorumPeerConfig.ConfigException {
final NiFiProperties secureProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
}});
final ZooKeeperStateServer server = ZooKeeperStateServer.create(secureProps);
Assert.assertNotNull(server);
final QuorumPeerConfig config = server.getQuorumPeerConfig();
Assert.assertEquals(secureZooKeeperProps.getProperty("secureClientPort"), String.valueOf(config.getSecureClientPortAddress().getPort()));
}
// This shows that a secure NiFi with an secure ZooKeeper will not have an insecure client address or port:
@Test
public void testCreateRemovesInsecureClientPort() {
Assert.assertNotNull(secureZooKeeperProps.getProperty("secureClientPort"));
Assert.assertNotEquals(secureZooKeeperProps.getProperty("clientPort"), "");
Assert.assertNull(secureQuorumPeerConfig.getClientPortAddress());
}
// This test shows that a connection class is set when none is specified (QuorumPeerConfig::parseProperties sets the System property):
@Test
public void testCreateWithUnspecifiedConnectionClass() {
Assert.assertEquals(org.apache.zookeeper.server.NettyServerCnxnFactory.class.getName(), System.getProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY));
}
// This test shows that a specified connection class is honored (QuorumPeerConfig::parseProperties sets the System property):
@Test
public void testCreateWithSpecifiedConnectionClass() throws IOException, QuorumPeerConfig.ConfigException {
final NiFiProperties secureProps = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
putAll(SECURE_NIFI_PROPS);
put(ZOOKEEPER_PROPERTIES_FILE_KEY, SECURE_ZOOKEEPER_PROPS);
}});
Assert.assertNotNull(ZooKeeperStateServer.create(secureProps));
Assert.assertEquals(ZOOKEEPER_CNXN_FACTORY, System.getProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY));
}
private static String getPath(String path) {
return new File("src/test/resources/TestZooKeeperStateServerConfigurations/" + path).getAbsolutePath();
}
}

View File

@ -96,7 +96,7 @@ public class ITSecureClientZooKeeperFactory {
createTrustStore(serverCert, "zookeeper", TEST_PASSWORD, clientTrustStore, KEYSTORE_TYPE);
createTrustStore(clientCert, "client", TEST_PASSWORD, serverTrustStore, KEYSTORE_TYPE);
clientProperties = createClientProperties(
clientProperties = createSecureClientProperties(
clientPort,
clientKeyStore,
KEYSTORE_TYPE,
@ -204,9 +204,9 @@ public class ITSecureClientZooKeeperFactory {
return secureConnectionFactory;
}
public static NiFiProperties createClientProperties(final int clientPort,
final Path keyStore, final String keyStoreType, final String keyStorePassword,
final Path trustStore, final String trustStoreType, final String trustStorePassword) {
public static NiFiProperties createSecureClientProperties(final int clientPort,
final Path keyStore, final String keyStoreType, final String keyStorePassword,
final Path trustStore, final String trustStoreType, final String trustStorePassword) {
final Properties properties = new Properties();
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, String.format("localhost:%d", clientPort));

View File

@ -0,0 +1,43 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#
#
clientPort=2381
initLimit=10
autopurge.purgeInterval=24
syncLimit=5
tickTime=2000
dataDir=./target/state/zookeeper
autopurge.snapRetainCount=30
#
# Specifies the servers that are part of this zookeeper ensemble. For
# every NiFi instance running an embedded zookeeper, there needs to be
# a server entry below. For instance:
#
# server.1=nifi-node1-hostname:2888:3888
# server.2=nifi-node2-hostname:2888:3888
# server.3=nifi-node3-hostname:2888:3888
#
# The index of the server corresponds to the myid file that gets created
# in the dataDir of each node running an embedded zookeeper. See the
# administration guide for more details.

View File

@ -0,0 +1,43 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#
#
initLimit=10
autopurge.purgeInterval=24
syncLimit=5
tickTime=2000
dataDir=./target/state/zookeeper
autopurge.snapRetainCount=30
#
# Specifies the servers that are part of this zookeeper ensemble. For
# every NiFi instance running an embedded zookeeper, there needs to be
# a server entry below. For instance:
#
# server.1=nifi-node1-hostname:2888:3888
# server.2=nifi-node2-hostname:2888:3888
# server.3=nifi-node3-hostname:2888:3888
#
# The index of the server corresponds to the myid file that gets created
# in the dataDir of each node running an embedded zookeeper. See the
# administration guide for more details.
server.1=nifi-node1-hostname:2888:3888

View File

@ -0,0 +1,31 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#
#
secureClientPort=2281
initLimit=10
autopurge.purgeInterval=24
syncLimit=5
tickTime=2000
dataDir=./target/state/zookeeper
autopurge.snapRetainCount=30
server.1=nifi-node1-hostname:2888:3888

View File

@ -93,6 +93,7 @@
<logger name="org.apache.zookeeper.ClientCnxn" level="ERROR" />
<logger name="org.apache.zookeeper.server.NIOServerCnxn" level="ERROR" />
<logger name="org.apache.zookeeper.server.NIOServerCnxnFactory" level="ERROR" />
<logger name="org.apache.zookeeper.server.NettyServerCnxnFactory" level="ERROR" />
<logger name="org.apache.zookeeper.server.quorum" level="ERROR" />
<logger name="org.apache.zookeeper.ZooKeeper" level="ERROR" />
<logger name="org.apache.zookeeper.server.PrepRequestProcessor" level="ERROR" />

View File

@ -27,6 +27,28 @@ tickTime=2000
dataDir=./state/zookeeper
autopurge.snapRetainCount=30
# Embedded/distributed ZK TLS connection support can be activated by setting these properties at minimum:
#
# secureClientPort=2281
# serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
# Most TLS configurations will set these values as well:
#
# ssl.keyStore.location=/example/path/to/key-store.jks
# ssl.keyStore.password=change this value to the actual value in your installation
# ssl.trustStore.location=/example/path/to/trust-store.jks
# ssl.trustStore.password=change this value to the actual value in your installation
# ssl.hostnameVerification=false
#
# Note that many ZK parameters can set as Java system properties, refer to the ZK admin guide for details:
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_configuration
# Other common settings:
#
# client.portUnification=true
# admin.enableServer=false
# The server string has changed as of 3.5.5 and the client port is now specified at the end of the server string:
# https://zookeeper.apache.org/doc/r3.5.5/zookeeperReconfig.html#sc_reconfig_clientport
#