mirror of https://github.com/apache/nifi.git
NIFI-7819 - Added ZooKeeperStateProvider TLS properties.
- Added tests for TLS with ZooKeeperStateProvider. - Added docs to administration guide. - Small fixes for PR comments. - Changed the ZooKeeperStateProvider to receive configuration from the nifi.properties file. Uses the Zookeeper TLS properties or if they are not declared, uses the standard NiFi TLS properties. - Updated administration-guide. - Fixed some boolean literalsl. Set the ZooKeeper watcher to null. Removed stacktrace prints to standard out. Added getPreferredProperty for key/truststore types. - Removing some unused code. Fixing up NiFi properties methods. Removed whitespace. - Added some tests for getPreferredProperty(). - Checkstyle fixes. - Passing through nifi properties to the state provider using an annotation to avoid ZooKeeper references in the StateManagerProvider. - Fixed comment. - Added CLIENT_SECURE property to isZooKeeperTlsConfigurationPresent() check. - Small change to getPreferredProperty, added more tests. - Added checkstyle fix. - Moved StateProviderContext to nifi-framework-api. - Changed combine properties to handle null NiFiProperties. Inject NiFiProperties object for tests. - Checkstyle fix. - Changed the connect string in state-management.xml to be required. Rearranged order of property validation to validate before initialization. - Rearranged the way ZooKeeperClientConfig is initialized and added a non blank validator to connect string. - Minor change to ZooKeeperClientConfig member variable set and get. This closes #4613. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
badcfe1ab7
commit
479ee6e3db
|
@ -1335,6 +1335,14 @@ public abstract class NiFiProperties {
|
||||||
return Boolean.parseBoolean(clientSecure);
|
return Boolean.parseBoolean(clientSecure);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isZooKeeperTlsConfigurationPresent() {
|
||||||
|
return StringUtils.isNotBlank(getProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE))
|
||||||
|
&& StringUtils.isNotBlank(getProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE))
|
||||||
|
&& getProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_PASSWD) != null
|
||||||
|
&& StringUtils.isNotBlank(getProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE))
|
||||||
|
&& getProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_PASSWD) != null;
|
||||||
|
}
|
||||||
|
|
||||||
public int size() {
|
public int size() {
|
||||||
return getPropertyKeys().size();
|
return getPropertyKeys().size();
|
||||||
}
|
}
|
||||||
|
|
|
@ -281,4 +281,50 @@ public class NiFiPropertiesTest {
|
||||||
// Assert specific values are used:
|
// Assert specific values are used:
|
||||||
assertEquals(properties.getWebMaxContentSize(), size);
|
assertEquals(properties.getWebMaxContentSize(), size);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
@Test
|
||||||
|
public void testIsZooKeeperTlsConfigurationPresent() {
|
||||||
|
NiFiProperties properties = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
|
||||||
|
put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "true");
|
||||||
|
put(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE, "/a/keystore/filepath/keystore.jks");
|
||||||
|
put(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_PASSWD, "password");
|
||||||
|
put(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, "JKS");
|
||||||
|
put(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE, "/a/truststore/filepath/truststore.jks");
|
||||||
|
put(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_PASSWD, "password");
|
||||||
|
put(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, "JKS");
|
||||||
|
}});
|
||||||
|
|
||||||
|
assertTrue(properties.isZooKeeperClientSecure());
|
||||||
|
assertTrue(properties.isZooKeeperTlsConfigurationPresent());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSomeZooKeeperTlsConfigurationIsMissing() {
|
||||||
|
NiFiProperties properties = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
|
||||||
|
put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "true");
|
||||||
|
put(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_PASSWD, "password");
|
||||||
|
put(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, "JKS");
|
||||||
|
put(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE, "/a/truststore/filepath/truststore.jks");
|
||||||
|
put(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, "JKS");
|
||||||
|
}});
|
||||||
|
|
||||||
|
assertTrue(properties.isZooKeeperClientSecure());
|
||||||
|
assertFalse(properties.isZooKeeperTlsConfigurationPresent());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testZooKeeperTlsPasswordsBlank() {
|
||||||
|
NiFiProperties properties = NiFiProperties.createBasicNiFiProperties(null, new HashMap<String, String>() {{
|
||||||
|
put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "true");
|
||||||
|
put(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE, "/a/keystore/filepath/keystore.jks");
|
||||||
|
put(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_PASSWD, "");
|
||||||
|
put(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, "JKS");
|
||||||
|
put(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE, "/a/truststore/filepath/truststore.jks");
|
||||||
|
put(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_PASSWD, "");
|
||||||
|
put(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, "JKS");
|
||||||
|
}});
|
||||||
|
|
||||||
|
assertTrue(properties.isZooKeeperClientSecure());
|
||||||
|
assertTrue(properties.isZooKeeperTlsConfigurationPresent());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -2188,7 +2188,7 @@ As discussed above, communications with ZooKeeper are insecure by default. The s
|
||||||
certificate-based authentication with a TLS-enabled ZooKeeper server (available since ZooKeeper's 3.5.x releases). Instructions for enabling TLS on an external
|
certificate-based authentication with a TLS-enabled ZooKeeper server (available since ZooKeeper's 3.5.x releases). Instructions for enabling TLS on an external
|
||||||
ZooKeeper ensemble can be found in the link:https://zookeeper.apache.org/doc/r3.5.5/zookeeperAdmin.html#sc_authOptions[ZooKeeper Administrator's Guide].
|
ZooKeeper ensemble can be found in the link:https://zookeeper.apache.org/doc/r3.5.5/zookeeperAdmin.html#sc_authOptions[ZooKeeper Administrator's Guide].
|
||||||
|
|
||||||
Once you have a TLS-enabled instance of ZooKeeper, TLS can be enabled for the NiFi client by creating a keystore and truststore and configuring the following properties
|
Once you have a TLS-enabled instance of ZooKeeper, TLS can be enabled for the NiFi client by setting `nifi.zookeeper.client.secure=true`. By default, the ZooKeeper client will use the existing `nifi.security.*` properties for the keystore and truststore. If you require separate TLS configuration for ZooKeeper, you can create a separate keystore and truststore and configure the following properties
|
||||||
in the _$NIFI_HOME/conf/nifi.properties_ file:
|
in the _$NIFI_HOME/conf/nifi.properties_ file:
|
||||||
|
|
||||||
[options="header,footer"]
|
[options="header,footer"]
|
||||||
|
@ -2203,10 +2203,8 @@ in the _$NIFI_HOME/conf/nifi.properties_ file:
|
||||||
|`nifi.zookeeper.security.truststorePasswd`|The password for the Truststore.|_none_
|
|`nifi.zookeeper.security.truststorePasswd`|The password for the Truststore.|_none_
|
||||||
|===
|
|===
|
||||||
|
|
||||||
These can be different from or identical to the configuration values in the `nifi.security.\*` security properties. If they are identical, the keystore and truststore must
|
Whether using the default security properties or the ZooKeeper specific properties, the keystore and truststores must contain the appropriate keys and certificates for use with ZooKeeper (i.e., the keys and certificates need to align with the ZooKeeper configuration either way).
|
||||||
still contain the appropriate keys and certificates for use with ZooKeeper (i.e., the keys and certificates need to align with the ZooKeeper configuration either way).
|
NiFi's TLS Toolkit can be used to help generate the keystore and truststore used for ZooKeeper client/server access.
|
||||||
If not re-using the `nifi.security.*` keystore and truststore, NiFi's TLS Toolkit can still be used to help generate the keystore and truststore used for ZooKeeper client
|
|
||||||
access.
|
|
||||||
|
|
||||||
After updating the above properties and starting NiFi, network communication with ZooKeeper will be secure and ZooKeeper will now use the NiFi node's certificate principal
|
After updating the above properties and starting NiFi, network communication with ZooKeeper will be secure and ZooKeeper will now use the NiFi node's certificate principal
|
||||||
when authenticating access. This will be reflected in log messages like the following on the ZooKeeper server:
|
when authenticating access. This will be reflected in log messages like the following on the ZooKeeper server:
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.components.state.annotation;
|
||||||
|
|
||||||
|
import java.lang.annotation.Documented;
|
||||||
|
import java.lang.annotation.ElementType;
|
||||||
|
import java.lang.annotation.Inherited;
|
||||||
|
import java.lang.annotation.Retention;
|
||||||
|
import java.lang.annotation.RetentionPolicy;
|
||||||
|
import java.lang.annotation.Target;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
@Documented
|
||||||
|
@Target({ElementType.FIELD, ElementType.METHOD})
|
||||||
|
@Retention(RetentionPolicy.RUNTIME)
|
||||||
|
@Inherited
|
||||||
|
public @interface StateProviderContext {
|
||||||
|
}
|
|
@ -0,0 +1,64 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.nifi.controller.cluster;
|
||||||
|
|
||||||
|
import org.apache.curator.utils.ZookeeperFactory;
|
||||||
|
import org.apache.zookeeper.Watcher;
|
||||||
|
import org.apache.zookeeper.ZooKeeper;
|
||||||
|
import org.apache.zookeeper.admin.ZooKeeperAdmin;
|
||||||
|
import org.apache.zookeeper.client.ZKClientConfig;
|
||||||
|
import org.apache.zookeeper.common.ClientX509Util;
|
||||||
|
import org.apache.zookeeper.common.X509Util;
|
||||||
|
|
||||||
|
public class SecureClientZooKeeperFactory implements ZookeeperFactory {
|
||||||
|
|
||||||
|
public static final String NETTY_CLIENT_CNXN_SOCKET = org.apache.zookeeper.ClientCnxnSocketNetty.class.getName();
|
||||||
|
|
||||||
|
private ZKClientConfig zkSecureClientConfig;
|
||||||
|
|
||||||
|
public SecureClientZooKeeperFactory(final ZooKeeperClientConfig zkConfig) {
|
||||||
|
this.zkSecureClientConfig = new ZKClientConfig();
|
||||||
|
|
||||||
|
// Netty is required for the secure client config.
|
||||||
|
final String cnxnSocket = zkConfig.getConnectionSocket();
|
||||||
|
if (!NETTY_CLIENT_CNXN_SOCKET.equals(cnxnSocket)) {
|
||||||
|
throw new IllegalArgumentException(String.format("connection factory set to '%s', %s required", String.valueOf(cnxnSocket), NETTY_CLIENT_CNXN_SOCKET));
|
||||||
|
}
|
||||||
|
zkSecureClientConfig.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, cnxnSocket);
|
||||||
|
|
||||||
|
// This should never happen but won't get checked elsewhere.
|
||||||
|
final boolean clientSecure = zkConfig.isClientSecure();
|
||||||
|
if (!clientSecure) {
|
||||||
|
throw new IllegalStateException(String.format("%s set to '%b', expected true", ZKClientConfig.SECURE_CLIENT, clientSecure));
|
||||||
|
}
|
||||||
|
zkSecureClientConfig.setProperty(ZKClientConfig.SECURE_CLIENT, String.valueOf(clientSecure));
|
||||||
|
|
||||||
|
final X509Util clientX509util = new ClientX509Util();
|
||||||
|
zkSecureClientConfig.setProperty(clientX509util.getSslKeystoreLocationProperty(), zkConfig.getKeyStore());
|
||||||
|
zkSecureClientConfig.setProperty(clientX509util.getSslKeystoreTypeProperty(), zkConfig.getKeyStoreType());
|
||||||
|
zkSecureClientConfig.setProperty(clientX509util.getSslKeystorePasswdProperty(), zkConfig.getKeyStorePassword());
|
||||||
|
zkSecureClientConfig.setProperty(clientX509util.getSslTruststoreLocationProperty(), zkConfig.getTrustStore());
|
||||||
|
zkSecureClientConfig.setProperty(clientX509util.getSslTruststoreTypeProperty(), zkConfig.getTrustStoreType());
|
||||||
|
zkSecureClientConfig.setProperty(clientX509util.getSslTruststorePasswdProperty(), zkConfig.getTrustStorePassword());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception {
|
||||||
|
return new ZooKeeperAdmin(connectString, sessionTimeout, watcher, zkSecureClientConfig);
|
||||||
|
}
|
||||||
|
}
|
|
@ -162,12 +162,12 @@ public class ZooKeeperClientConfig {
|
||||||
final long connectionTimeoutMs = getTimePeriod(nifiProperties, NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT);
|
final long connectionTimeoutMs = getTimePeriod(nifiProperties, NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT);
|
||||||
final String rootPath = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, NiFiProperties.DEFAULT_ZOOKEEPER_ROOT_NODE);
|
final String rootPath = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, NiFiProperties.DEFAULT_ZOOKEEPER_ROOT_NODE);
|
||||||
final boolean clientSecure = nifiProperties.isZooKeeperClientSecure();
|
final boolean clientSecure = nifiProperties.isZooKeeperClientSecure();
|
||||||
final String keyStore = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE);
|
final String keyStore = getPreferredProperty(nifiProperties, NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE, NiFiProperties.SECURITY_KEYSTORE);
|
||||||
final String keyStoreType = StringUtils.stripToNull(nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE));
|
final String keyStoreType = StringUtils.stripToNull(getPreferredProperty(nifiProperties, NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, NiFiProperties.SECURITY_KEYSTORE_TYPE));
|
||||||
final String keyStorePassword = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_PASSWD);
|
final String keyStorePassword = getPreferredProperty(nifiProperties, NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_PASSWD, NiFiProperties.SECURITY_KEYSTORE_PASSWD);
|
||||||
final String trustStore = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE);
|
final String trustStore = getPreferredProperty(nifiProperties, NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE, NiFiProperties.SECURITY_TRUSTSTORE);
|
||||||
final String trustStoreType = StringUtils.stripToNull(nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE));
|
final String trustStoreType = StringUtils.stripToNull(getPreferredProperty(nifiProperties, NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, NiFiProperties.SECURITY_TRUSTSTORE_TYPE));
|
||||||
final String trustStorePassword = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_PASSWD);
|
final String trustStorePassword = getPreferredProperty(nifiProperties, NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_PASSWD, NiFiProperties.SECURITY_TRUSTSTORE_PASSWD);
|
||||||
final String authType = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_AUTH_TYPE, NiFiProperties.DEFAULT_ZOOKEEPER_AUTH_TYPE);
|
final String authType = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_AUTH_TYPE, NiFiProperties.DEFAULT_ZOOKEEPER_AUTH_TYPE);
|
||||||
final String authPrincipal = nifiProperties.getKerberosServicePrincipal();
|
final String authPrincipal = nifiProperties.getKerberosServicePrincipal();
|
||||||
final String removeHostFromPrincipal = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_KERBEROS_REMOVE_HOST_FROM_PRINCIPAL,
|
final String removeHostFromPrincipal = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_KERBEROS_REMOVE_HOST_FROM_PRINCIPAL,
|
||||||
|
@ -210,6 +210,23 @@ public class ZooKeeperClientConfig {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieves the preferred property value if it's set, otherwise retrieves the default property. If neither are set, returns null.
|
||||||
|
* @param properties The NiFi properties configuration.
|
||||||
|
* @param preferredPropertyName The preferred property to get from NiFi properties.
|
||||||
|
* @param defaultPropertyName The backup property to get from NiFi properties if the preferred property is not present.
|
||||||
|
* @return Returns the property in order of preference.
|
||||||
|
*/
|
||||||
|
private static String getPreferredProperty(final NiFiProperties properties, final String preferredPropertyName, final String defaultPropertyName) {
|
||||||
|
String retrievedProperty = properties.getProperty(preferredPropertyName);
|
||||||
|
|
||||||
|
if(StringUtils.isBlank(retrievedProperty)) {
|
||||||
|
retrievedProperty = properties.getProperty(defaultPropertyName);
|
||||||
|
}
|
||||||
|
|
||||||
|
return retrievedProperty;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Takes a given connect string and splits it by ',' character. For each
|
* Takes a given connect string and splits it by ',' character. For each
|
||||||
* split result trims whitespace then splits by ':' character. For each
|
* split result trims whitespace then splits by ':' character. For each
|
||||||
|
|
|
@ -19,6 +19,8 @@ package org.apache.nifi.controller.state.manager;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -45,6 +47,7 @@ import org.apache.nifi.controller.state.StandardStateManager;
|
||||||
import org.apache.nifi.controller.state.StandardStateProviderInitializationContext;
|
import org.apache.nifi.controller.state.StandardStateProviderInitializationContext;
|
||||||
import org.apache.nifi.controller.state.config.StateManagerConfiguration;
|
import org.apache.nifi.controller.state.config.StateManagerConfiguration;
|
||||||
import org.apache.nifi.controller.state.config.StateProviderConfiguration;
|
import org.apache.nifi.controller.state.config.StateProviderConfiguration;
|
||||||
|
import org.apache.nifi.components.state.annotation.StateProviderContext;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
import org.apache.nifi.nar.ExtensionManager;
|
import org.apache.nifi.nar.ExtensionManager;
|
||||||
import org.apache.nifi.nar.NarCloseable;
|
import org.apache.nifi.nar.NarCloseable;
|
||||||
|
@ -66,11 +69,13 @@ public class StandardStateManagerProvider implements StateManagerProvider {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(StandardStateManagerProvider.class);
|
private static final Logger logger = LoggerFactory.getLogger(StandardStateManagerProvider.class);
|
||||||
|
|
||||||
private static StateManagerProvider provider;
|
private static StateManagerProvider provider;
|
||||||
|
private static NiFiProperties nifiProperties;
|
||||||
|
|
||||||
private final ConcurrentMap<String, StateManager> stateManagers = new ConcurrentHashMap<>();
|
private final ConcurrentMap<String, StateManager> stateManagers = new ConcurrentHashMap<>();
|
||||||
private final StateProvider localStateProvider;
|
private final StateProvider localStateProvider;
|
||||||
private final StateProvider clusterStateProvider;
|
private final StateProvider clusterStateProvider;
|
||||||
|
|
||||||
|
|
||||||
private StandardStateManagerProvider(final StateProvider localStateProvider, final StateProvider clusterStateProvider) {
|
private StandardStateManagerProvider(final StateProvider localStateProvider, final StateProvider clusterStateProvider) {
|
||||||
this.localStateProvider = localStateProvider;
|
this.localStateProvider = localStateProvider;
|
||||||
this.clusterStateProvider = clusterStateProvider;
|
this.clusterStateProvider = clusterStateProvider;
|
||||||
|
@ -78,6 +83,8 @@ public class StandardStateManagerProvider implements StateManagerProvider {
|
||||||
|
|
||||||
public static synchronized StateManagerProvider create(final NiFiProperties properties, final VariableRegistry variableRegistry, final ExtensionManager extensionManager,
|
public static synchronized StateManagerProvider create(final NiFiProperties properties, final VariableRegistry variableRegistry, final ExtensionManager extensionManager,
|
||||||
final ParameterLookup parameterLookup) throws ConfigParseException, IOException {
|
final ParameterLookup parameterLookup) throws ConfigParseException, IOException {
|
||||||
|
nifiProperties = properties;
|
||||||
|
|
||||||
if (provider != null) {
|
if (provider != null) {
|
||||||
return provider;
|
return provider;
|
||||||
}
|
}
|
||||||
|
@ -105,14 +112,12 @@ public class StandardStateManagerProvider implements StateManagerProvider {
|
||||||
return createStateProvider(configFile, Scope.LOCAL, properties, variableRegistry, extensionManager, parameterLookup);
|
return createStateProvider(configFile, Scope.LOCAL, properties, variableRegistry, extensionManager, parameterLookup);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static StateProvider createClusteredStateProvider(final NiFiProperties properties, final VariableRegistry variableRegistry, final ExtensionManager extensionManager,
|
private static StateProvider createClusteredStateProvider(final NiFiProperties properties, final VariableRegistry variableRegistry, final ExtensionManager extensionManager,
|
||||||
final ParameterLookup parameterLookup) throws IOException, ConfigParseException {
|
final ParameterLookup parameterLookup) throws IOException, ConfigParseException {
|
||||||
final File configFile = properties.getStateManagementConfigFile();
|
final File configFile = properties.getStateManagementConfigFile();
|
||||||
return createStateProvider(configFile, Scope.CLUSTER, properties, variableRegistry, extensionManager, parameterLookup);
|
return createStateProvider(configFile, Scope.CLUSTER, properties, variableRegistry, extensionManager, parameterLookup);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private static StateProvider createStateProvider(final File configFile, final Scope scope, final NiFiProperties properties, final VariableRegistry variableRegistry,
|
private static StateProvider createStateProvider(final File configFile, final Scope scope, final NiFiProperties properties, final VariableRegistry variableRegistry,
|
||||||
final ExtensionManager extensionManager, final ParameterLookup parameterLookup) throws ConfigParseException, IOException {
|
final ExtensionManager extensionManager, final ParameterLookup parameterLookup) throws ConfigParseException, IOException {
|
||||||
final String providerId;
|
final String providerId;
|
||||||
|
@ -193,6 +198,15 @@ public class StandardStateManagerProvider implements StateManagerProvider {
|
||||||
+ " is configured to use scope " + scope);
|
+ " is configured to use scope " + scope);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final SSLContext sslContext;
|
||||||
|
StandardTlsConfiguration standardTlsConfiguration = StandardTlsConfiguration.fromNiFiProperties(properties);
|
||||||
|
try {
|
||||||
|
sslContext = SslContextFactory.createSslContext(standardTlsConfiguration);
|
||||||
|
} catch (TlsException e) {
|
||||||
|
logger.error("Encountered an error configuring TLS for state manager: ", e);
|
||||||
|
throw new IllegalStateException("Error configuring TLS for state manager", e);
|
||||||
|
}
|
||||||
|
|
||||||
//create variable registry
|
//create variable registry
|
||||||
final ParameterParser parser = new ExpressionLanguageAwareParameterParser();
|
final ParameterParser parser = new ExpressionLanguageAwareParameterParser();
|
||||||
final Map<PropertyDescriptor, PropertyValue> propertyMap = new HashMap<>();
|
final Map<PropertyDescriptor, PropertyValue> propertyMap = new HashMap<>();
|
||||||
|
@ -206,6 +220,7 @@ public class StandardStateManagerProvider implements StateManagerProvider {
|
||||||
|
|
||||||
propertyStringMap.put(descriptor, configuration);
|
propertyStringMap.put(descriptor, configuration);
|
||||||
}
|
}
|
||||||
|
|
||||||
//set properties from actual configuration
|
//set properties from actual configuration
|
||||||
for (final Map.Entry<String, String> entry : providerConfig.getProperties().entrySet()) {
|
for (final Map.Entry<String, String> entry : providerConfig.getProperties().entrySet()) {
|
||||||
final PropertyDescriptor descriptor = provider.getPropertyDescriptor(entry.getKey());
|
final PropertyDescriptor descriptor = provider.getPropertyDescriptor(entry.getKey());
|
||||||
|
@ -217,14 +232,6 @@ public class StandardStateManagerProvider implements StateManagerProvider {
|
||||||
propertyMap.put(descriptor, new StandardPropertyValue(entry.getValue(),null, parameterLookup, variableRegistry));
|
propertyMap.put(descriptor, new StandardPropertyValue(entry.getValue(),null, parameterLookup, variableRegistry));
|
||||||
}
|
}
|
||||||
|
|
||||||
final SSLContext sslContext;
|
|
||||||
try {
|
|
||||||
sslContext = SslContextFactory.createSslContext(StandardTlsConfiguration.fromNiFiProperties(properties));
|
|
||||||
} catch (TlsException e) {
|
|
||||||
logger.error("Encountered an error configuring TLS for state manager: ", e);
|
|
||||||
throw new IllegalStateException("Error configuring TLS for state manager", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
final ComponentLog logger = new SimpleProcessLogger(providerId, provider);
|
final ComponentLog logger = new SimpleProcessLogger(providerId, provider);
|
||||||
final StateProviderInitializationContext initContext = new StandardStateProviderInitializationContext(providerId, propertyMap, sslContext, logger);
|
final StateProviderInitializationContext initContext = new StandardStateProviderInitializationContext(providerId, propertyMap, sslContext, logger);
|
||||||
|
|
||||||
|
@ -253,6 +260,39 @@ public class StandardStateManagerProvider implements StateManagerProvider {
|
||||||
return provider;
|
return provider;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Inject NiFi Properties to state providers that use the StateProviderContext annotation
|
||||||
|
private static void performMethodInjection(final Object instance, final Class stateProviderClass) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException {
|
||||||
|
for (final Method method : stateProviderClass.getMethods()) {
|
||||||
|
if (method.isAnnotationPresent(StateProviderContext.class)) {
|
||||||
|
// make the method accessible
|
||||||
|
final boolean isAccessible = method.isAccessible();
|
||||||
|
method.setAccessible(true);
|
||||||
|
|
||||||
|
try {
|
||||||
|
final Class<?>[] argumentTypes = method.getParameterTypes();
|
||||||
|
|
||||||
|
// look for setters (single argument)
|
||||||
|
if (argumentTypes.length == 1) {
|
||||||
|
final Class<?> argumentType = argumentTypes[0];
|
||||||
|
|
||||||
|
// look for well known types
|
||||||
|
if (NiFiProperties.class.isAssignableFrom(argumentType)) {
|
||||||
|
// nifi properties injection
|
||||||
|
method.invoke(instance, nifiProperties);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
method.setAccessible(isAccessible);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final Class parentClass = stateProviderClass.getSuperclass();
|
||||||
|
if (parentClass != null && StateProvider.class.isAssignableFrom(parentClass)) {
|
||||||
|
performMethodInjection(instance, parentClass);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static StateProvider instantiateStateProvider(final ExtensionManager extensionManager, final String type) throws ClassNotFoundException, InstantiationException, IllegalAccessException {
|
private static StateProvider instantiateStateProvider(final ExtensionManager extensionManager, final String type) throws ClassNotFoundException, InstantiationException, IllegalAccessException {
|
||||||
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
|
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
|
||||||
try {
|
try {
|
||||||
|
@ -270,7 +310,13 @@ public class StandardStateManagerProvider implements StateManagerProvider {
|
||||||
|
|
||||||
Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
|
Thread.currentThread().setContextClassLoader(detectedClassLoaderForType);
|
||||||
final Class<? extends StateProvider> mgrClass = rawClass.asSubclass(StateProvider.class);
|
final Class<? extends StateProvider> mgrClass = rawClass.asSubclass(StateProvider.class);
|
||||||
return withNarClassLoader(mgrClass.newInstance());
|
StateProvider provider = mgrClass.newInstance();
|
||||||
|
try {
|
||||||
|
performMethodInjection(provider, mgrClass);
|
||||||
|
} catch (InvocationTargetException e) {
|
||||||
|
logger.error(String.format("Failed to inject nifi.properties to the '%s' state provider.", type), e);
|
||||||
|
}
|
||||||
|
return withNarClassLoader(provider);
|
||||||
} finally {
|
} finally {
|
||||||
if (ctxClassLoader != null) {
|
if (ctxClassLoader != null) {
|
||||||
Thread.currentThread().setContextClassLoader(ctxClassLoader);
|
Thread.currentThread().setContextClassLoader(ctxClassLoader);
|
||||||
|
|
|
@ -17,6 +17,36 @@
|
||||||
|
|
||||||
package org.apache.nifi.controller.state.providers.zookeeper;
|
package org.apache.nifi.controller.state.providers.zookeeper;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.nifi.components.state.annotation.StateProviderContext;
|
||||||
|
import org.apache.nifi.components.AllowableValue;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
import org.apache.nifi.components.ValidationResult;
|
||||||
|
import org.apache.nifi.components.Validator;
|
||||||
|
import org.apache.nifi.components.state.Scope;
|
||||||
|
import org.apache.nifi.components.state.StateMap;
|
||||||
|
import org.apache.nifi.components.state.StateProviderInitializationContext;
|
||||||
|
import org.apache.nifi.components.state.exception.StateTooLargeException;
|
||||||
|
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
|
||||||
|
import org.apache.nifi.controller.cluster.SecureClientZooKeeperFactory;
|
||||||
|
import org.apache.nifi.controller.state.StandardStateMap;
|
||||||
|
import org.apache.nifi.controller.state.providers.AbstractStateProvider;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.util.NiFiProperties;
|
||||||
|
import org.apache.zookeeper.CreateMode;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.apache.zookeeper.KeeperException.Code;
|
||||||
|
import org.apache.zookeeper.KeeperException.NoNodeException;
|
||||||
|
import org.apache.zookeeper.ZKUtil;
|
||||||
|
import org.apache.zookeeper.ZooDefs.Ids;
|
||||||
|
import org.apache.zookeeper.ZooKeeper;
|
||||||
|
import org.apache.zookeeper.client.ConnectStringParser;
|
||||||
|
import org.apache.zookeeper.data.ACL;
|
||||||
|
import org.apache.zookeeper.data.Stat;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
|
@ -27,33 +57,10 @@ import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Properties;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.apache.nifi.components.AllowableValue;
|
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
|
||||||
import org.apache.nifi.components.ValidationContext;
|
|
||||||
import org.apache.nifi.components.ValidationResult;
|
|
||||||
import org.apache.nifi.components.Validator;
|
|
||||||
import org.apache.nifi.components.state.Scope;
|
|
||||||
import org.apache.nifi.components.state.StateMap;
|
|
||||||
import org.apache.nifi.components.state.StateProviderInitializationContext;
|
|
||||||
import org.apache.nifi.components.state.exception.StateTooLargeException;
|
|
||||||
import org.apache.nifi.controller.state.StandardStateMap;
|
|
||||||
import org.apache.nifi.controller.state.providers.AbstractStateProvider;
|
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
|
||||||
import org.apache.zookeeper.CreateMode;
|
|
||||||
import org.apache.zookeeper.KeeperException;
|
|
||||||
import org.apache.zookeeper.KeeperException.Code;
|
|
||||||
import org.apache.zookeeper.KeeperException.NoNodeException;
|
|
||||||
import org.apache.zookeeper.WatchedEvent;
|
|
||||||
import org.apache.zookeeper.Watcher;
|
|
||||||
import org.apache.zookeeper.ZKUtil;
|
|
||||||
import org.apache.zookeeper.ZooDefs.Ids;
|
|
||||||
import org.apache.zookeeper.ZooKeeper;
|
|
||||||
import org.apache.zookeeper.client.ConnectStringParser;
|
|
||||||
import org.apache.zookeeper.data.ACL;
|
|
||||||
import org.apache.zookeeper.data.Stat;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ZooKeeperStateProvider utilizes a ZooKeeper based store, whether provided internally via configuration and enabling of the {@link org.apache.nifi.controller.state.server.ZooKeeperStateServer}
|
* ZooKeeperStateProvider utilizes a ZooKeeper based store, whether provided internally via configuration and enabling of the {@link org.apache.nifi.controller.state.server.ZooKeeperStateServer}
|
||||||
|
@ -61,7 +68,9 @@ import org.apache.zookeeper.data.Stat;
|
||||||
* consistency across configuration interactions.
|
* consistency across configuration interactions.
|
||||||
*/
|
*/
|
||||||
public class ZooKeeperStateProvider extends AbstractStateProvider {
|
public class ZooKeeperStateProvider extends AbstractStateProvider {
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(ZooKeeperStateProvider.class);
|
||||||
private static final int ONE_MB = 1024 * 1024;
|
private static final int ONE_MB = 1024 * 1024;
|
||||||
|
private NiFiProperties nifiProperties;
|
||||||
|
|
||||||
static final AllowableValue OPEN_TO_WORLD = new AllowableValue("Open", "Open", "ZNodes will be open to any ZooKeeper client.");
|
static final AllowableValue OPEN_TO_WORLD = new AllowableValue("Open", "Open", "ZNodes will be open to any ZooKeeper client.");
|
||||||
static final AllowableValue CREATOR_ONLY = new AllowableValue("CreatorOnly", "CreatorOnly",
|
static final AllowableValue CREATOR_ONLY = new AllowableValue("CreatorOnly", "CreatorOnly",
|
||||||
|
@ -83,7 +92,8 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
|
||||||
return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid Connect String").valid(true).build();
|
return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid Connect String").valid(true).build();
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.required(false)
|
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
|
||||||
|
.required(true)
|
||||||
.build();
|
.build();
|
||||||
static final PropertyDescriptor SESSION_TIMEOUT = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor SESSION_TIMEOUT = new PropertyDescriptor.Builder()
|
||||||
.name("Session Timeout")
|
.name("Session Timeout")
|
||||||
|
@ -118,10 +128,15 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
|
||||||
private byte[] auth;
|
private byte[] auth;
|
||||||
private List<ACL> acl;
|
private List<ACL> acl;
|
||||||
|
|
||||||
|
private ZooKeeperClientConfig zooKeeperClientConfig;
|
||||||
|
|
||||||
public ZooKeeperStateProvider() {
|
public ZooKeeperStateProvider() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@StateProviderContext
|
||||||
|
public void setNiFiProperties(NiFiProperties properties) {
|
||||||
|
this.nifiProperties = properties;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
|
@ -133,7 +148,6 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
|
||||||
return properties;
|
return properties;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void init(final StateProviderInitializationContext context) {
|
public synchronized void init(final StateProviderInitializationContext context) {
|
||||||
connectionString = context.getProperty(CONNECTION_STRING).getValue();
|
connectionString = context.getProperty(CONNECTION_STRING).getValue();
|
||||||
|
@ -147,6 +161,32 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Combine properties from NiFiProperties and additional properties, allowing the additional properties to override settings
|
||||||
|
* in the given NiFiProperties.
|
||||||
|
* @param nifiProps A NiFiProperties to be combined with some additional properties
|
||||||
|
* @param additionalProperties Additional properties that can be used to override properties in the given NiFiProperties
|
||||||
|
* @return NiFiProperties that contains the combined properties
|
||||||
|
*/
|
||||||
|
static NiFiProperties combineProperties(NiFiProperties nifiProps, Properties additionalProperties) {
|
||||||
|
return new NiFiProperties() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getProperty(String key) {
|
||||||
|
// Get the additional properties as preference over the NiFiProperties value. Will return null if the property
|
||||||
|
// is not available through either object.
|
||||||
|
return additionalProperties.getProperty(key, nifiProps != null ? nifiProps.getProperty(key) : null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<String> getPropertyKeys() {
|
||||||
|
Set<String> prop = additionalProperties.keySet().stream().map(key -> (String) key).collect(Collectors.toSet());
|
||||||
|
prop.addAll(nifiProps.getPropertyKeys());
|
||||||
|
return prop;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void shutdown() {
|
public synchronized void shutdown() {
|
||||||
if (zooKeeper != null) {
|
if (zooKeeper != null) {
|
||||||
|
@ -162,16 +202,26 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
|
||||||
|
|
||||||
// visible for testing
|
// visible for testing
|
||||||
synchronized ZooKeeper getZooKeeper() throws IOException {
|
synchronized ZooKeeper getZooKeeper() throws IOException {
|
||||||
|
|
||||||
|
ZooKeeperClientConfig clientConfig = getZooKeeperConfig();
|
||||||
|
|
||||||
if (zooKeeper != null && !zooKeeper.getState().isAlive()) {
|
if (zooKeeper != null && !zooKeeper.getState().isAlive()) {
|
||||||
invalidateClient();
|
invalidateClient();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (zooKeeper == null) {
|
if (zooKeeper == null) {
|
||||||
zooKeeper = new ZooKeeper(connectionString, timeoutMillis, new Watcher() {
|
if(clientConfig != null && clientConfig.isClientSecure()) {
|
||||||
@Override
|
SecureClientZooKeeperFactory factory = new SecureClientZooKeeperFactory(clientConfig);
|
||||||
public void process(WatchedEvent event) {
|
try {
|
||||||
|
zooKeeper = factory.newZooKeeper(connectionString, timeoutMillis, null, true);
|
||||||
|
logger.info("Secure Zookeeper client initialized successfully.");
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("Secure Zookeeper configuration failed!", e);
|
||||||
|
invalidateClient();
|
||||||
}
|
}
|
||||||
});
|
} else {
|
||||||
|
zooKeeper = new ZooKeeper(connectionString, timeoutMillis, null);
|
||||||
|
}
|
||||||
|
|
||||||
if (auth != null) {
|
if (auth != null) {
|
||||||
zooKeeper.addAuthInfo("digest", auth);
|
zooKeeper.addAuthInfo("digest", auth);
|
||||||
|
@ -181,6 +231,20 @@ public class ZooKeeperStateProvider extends AbstractStateProvider {
|
||||||
return zooKeeper;
|
return zooKeeper;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ZooKeeperClientConfig getZooKeeperConfig() {
|
||||||
|
if(zooKeeperClientConfig != null) {
|
||||||
|
return zooKeeperClientConfig;
|
||||||
|
} else {
|
||||||
|
Properties stateProviderProperties = new Properties();
|
||||||
|
stateProviderProperties.setProperty(NiFiProperties.ZOOKEEPER_SESSION_TIMEOUT, String.valueOf(timeoutMillis));
|
||||||
|
stateProviderProperties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, String.valueOf(timeoutMillis));
|
||||||
|
stateProviderProperties.setProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, rootNode);
|
||||||
|
stateProviderProperties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, connectionString);
|
||||||
|
zooKeeperClientConfig = ZooKeeperClientConfig.createConfig(combineProperties(nifiProperties, stateProviderProperties));
|
||||||
|
return zooKeeperClientConfig;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private synchronized void invalidateClient() {
|
private synchronized void invalidateClient() {
|
||||||
shutdown();
|
shutdown();
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,12 +24,21 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
public class ZooKeeperClientConfigTest {
|
public class ZooKeeperClientConfigTest {
|
||||||
|
|
||||||
|
private static final String LOCAL_CONNECT_STRING = "local:1234";
|
||||||
|
private static final String ZOOKEEPER_STORE_TYPE = "JKS";
|
||||||
|
private static final String ZOOKEEPER_KEYSTORE = "/zooKeeperKeystore.jks";
|
||||||
|
private static final String ZOOKEEPER_TRUSTSTORE = "/zooKeeperTruststore.jks";
|
||||||
|
private static final String DEFAULT_KEYSTORE = "/defaultKeystore.p12";
|
||||||
|
private static final String DEFAULT_TRUSTSTORE = "/defaultTruststore.p12";
|
||||||
|
private static final String DEFAULT_STORE_TYPE = "PKCS12";
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEasyCase(){
|
public void testEasyCase(){
|
||||||
final String input = "local:1234";
|
final String input = "local:1234";
|
||||||
|
@ -41,7 +50,7 @@ public class ZooKeeperClientConfigTest {
|
||||||
public void testValidFunkyInput(){
|
public void testValidFunkyInput(){
|
||||||
final String input = "local: 1234 ";
|
final String input = "local: 1234 ";
|
||||||
final String cleanedInput = ZooKeeperClientConfig.cleanConnectString(input);
|
final String cleanedInput = ZooKeeperClientConfig.cleanConnectString(input);
|
||||||
assertEquals("local:1234", cleanedInput);
|
assertEquals(LOCAL_CONNECT_STRING, cleanedInput);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(expected = IllegalStateException.class)
|
@Test(expected = IllegalStateException.class)
|
||||||
|
@ -83,8 +92,8 @@ public class ZooKeeperClientConfigTest {
|
||||||
@Test
|
@Test
|
||||||
public void testValidClientSecureTrue() {
|
public void testValidClientSecureTrue() {
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, LOCAL_CONNECT_STRING);
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "true");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, Boolean.TRUE.toString());
|
||||||
|
|
||||||
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
|
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
|
||||||
assertTrue(zkClientConfig.isClientSecure());
|
assertTrue(zkClientConfig.isClientSecure());
|
||||||
|
@ -94,8 +103,8 @@ public class ZooKeeperClientConfigTest {
|
||||||
@Test
|
@Test
|
||||||
public void testValidClientSecureFalse() {
|
public void testValidClientSecureFalse() {
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, LOCAL_CONNECT_STRING);
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "false");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, Boolean.FALSE.toString());
|
||||||
|
|
||||||
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
|
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
|
||||||
assertFalse(zkClientConfig.isClientSecure());
|
assertFalse(zkClientConfig.isClientSecure());
|
||||||
|
@ -105,7 +114,7 @@ public class ZooKeeperClientConfigTest {
|
||||||
@Test
|
@Test
|
||||||
public void testValidClientSecureEmpty() {
|
public void testValidClientSecureEmpty() {
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, LOCAL_CONNECT_STRING);
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "");
|
||||||
|
|
||||||
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
|
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
|
||||||
|
@ -116,7 +125,7 @@ public class ZooKeeperClientConfigTest {
|
||||||
@Test
|
@Test
|
||||||
public void testValidClientSecureSpaces() {
|
public void testValidClientSecureSpaces() {
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, LOCAL_CONNECT_STRING);
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, " true ");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, " true ");
|
||||||
|
|
||||||
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
|
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
|
||||||
|
@ -127,8 +136,8 @@ public class ZooKeeperClientConfigTest {
|
||||||
@Test
|
@Test
|
||||||
public void testValidClientSecureUpperCase() {
|
public void testValidClientSecureUpperCase() {
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, LOCAL_CONNECT_STRING);
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "TRUE");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, Boolean.TRUE.toString());
|
||||||
ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
|
ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
|
||||||
|
|
||||||
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
|
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
|
||||||
|
@ -139,7 +148,7 @@ public class ZooKeeperClientConfigTest {
|
||||||
@Test(expected = RuntimeException.class)
|
@Test(expected = RuntimeException.class)
|
||||||
public void testInvalidClientSecure() {
|
public void testInvalidClientSecure() {
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, LOCAL_CONNECT_STRING);
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "meh");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "meh");
|
||||||
ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
|
ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
|
||||||
}
|
}
|
||||||
|
@ -148,7 +157,7 @@ public class ZooKeeperClientConfigTest {
|
||||||
public void testKeyStoreTypes() {
|
public void testKeyStoreTypes() {
|
||||||
final String storeType = "JKS";
|
final String storeType = "JKS";
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, LOCAL_CONNECT_STRING);
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, storeType);
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, storeType);
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, storeType);
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, storeType);
|
||||||
|
|
||||||
|
@ -161,7 +170,7 @@ public class ZooKeeperClientConfigTest {
|
||||||
public void testKeyStoreTypesSpaces() {
|
public void testKeyStoreTypesSpaces() {
|
||||||
final String storeType = " JKS ";
|
final String storeType = " JKS ";
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, LOCAL_CONNECT_STRING);
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, storeType);
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, storeType);
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, storeType);
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, storeType);
|
||||||
|
|
||||||
|
@ -174,7 +183,7 @@ public class ZooKeeperClientConfigTest {
|
||||||
@Test
|
@Test
|
||||||
public void testEmptyKeyStoreTypes() {
|
public void testEmptyKeyStoreTypes() {
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, LOCAL_CONNECT_STRING);
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, "");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, "");
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, "");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, "");
|
||||||
|
|
||||||
|
@ -186,7 +195,7 @@ public class ZooKeeperClientConfigTest {
|
||||||
@Test
|
@Test
|
||||||
public void testBlankKeyStoreTypes() {
|
public void testBlankKeyStoreTypes() {
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, "local:1234");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, LOCAL_CONNECT_STRING);
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, " ");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, " ");
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, " ");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, " ");
|
||||||
|
|
||||||
|
@ -201,4 +210,71 @@ public class ZooKeeperClientConfigTest {
|
||||||
assertEquals("org.apache.zookeeper.ClientCnxnSocketNIO", ZooKeeperClientConfig.NIO_CLIENT_CNXN_SOCKET);
|
assertEquals("org.apache.zookeeper.ClientCnxnSocketNIO", ZooKeeperClientConfig.NIO_CLIENT_CNXN_SOCKET);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetPreferredZookeeperTlsProperty() {
|
||||||
|
final Properties properties = new Properties();
|
||||||
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, LOCAL_CONNECT_STRING);
|
||||||
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE, ZOOKEEPER_KEYSTORE);
|
||||||
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, ZOOKEEPER_STORE_TYPE);
|
||||||
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE, ZOOKEEPER_TRUSTSTORE);
|
||||||
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, ZOOKEEPER_STORE_TYPE);
|
||||||
|
|
||||||
|
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
|
||||||
|
assertEquals(ZOOKEEPER_KEYSTORE, zkClientConfig.getKeyStore());
|
||||||
|
assertEquals(ZOOKEEPER_TRUSTSTORE, zkClientConfig.getTrustStore());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPreferredDefaultTlsPropertyOnly() {
|
||||||
|
final Properties properties = new Properties();
|
||||||
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, LOCAL_CONNECT_STRING);
|
||||||
|
properties.setProperty(NiFiProperties.SECURITY_KEYSTORE, DEFAULT_KEYSTORE);
|
||||||
|
properties.setProperty(NiFiProperties.SECURITY_KEYSTORE_TYPE, DEFAULT_STORE_TYPE);
|
||||||
|
properties.setProperty(NiFiProperties.SECURITY_TRUSTSTORE, DEFAULT_TRUSTSTORE);
|
||||||
|
properties.setProperty(NiFiProperties.SECURITY_TRUSTSTORE_TYPE, DEFAULT_STORE_TYPE);
|
||||||
|
|
||||||
|
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
|
||||||
|
assertEquals(DEFAULT_KEYSTORE, zkClientConfig.getKeyStore());
|
||||||
|
assertEquals(DEFAULT_TRUSTSTORE, zkClientConfig.getTrustStore());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetPreferredPropertyCombinationChoosesZookeeper() {
|
||||||
|
final Properties properties = new Properties();
|
||||||
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, LOCAL_CONNECT_STRING);
|
||||||
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE, ZOOKEEPER_KEYSTORE);
|
||||||
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, ZOOKEEPER_STORE_TYPE);
|
||||||
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE, ZOOKEEPER_TRUSTSTORE);
|
||||||
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, ZOOKEEPER_STORE_TYPE);
|
||||||
|
properties.setProperty(NiFiProperties.SECURITY_KEYSTORE, DEFAULT_KEYSTORE);
|
||||||
|
properties.setProperty(NiFiProperties.SECURITY_KEYSTORE_TYPE, DEFAULT_STORE_TYPE);
|
||||||
|
properties.setProperty(NiFiProperties.SECURITY_TRUSTSTORE, DEFAULT_TRUSTSTORE);
|
||||||
|
properties.setProperty(NiFiProperties.SECURITY_TRUSTSTORE_TYPE, DEFAULT_STORE_TYPE);
|
||||||
|
|
||||||
|
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
|
||||||
|
assertEquals(ZOOKEEPER_KEYSTORE, zkClientConfig.getKeyStore());
|
||||||
|
assertEquals(ZOOKEEPER_TRUSTSTORE, zkClientConfig.getTrustStore());
|
||||||
|
assertEquals(ZOOKEEPER_STORE_TYPE, zkClientConfig.getKeyStoreType());
|
||||||
|
assertEquals(ZOOKEEPER_STORE_TYPE, zkClientConfig.getTrustStoreType());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIfGetPreferredPropertyIsBlankChoosesDefault() {
|
||||||
|
final Properties properties = new Properties();
|
||||||
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, LOCAL_CONNECT_STRING);
|
||||||
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE, "");
|
||||||
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, "");
|
||||||
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE, "");
|
||||||
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, "");
|
||||||
|
properties.setProperty(NiFiProperties.SECURITY_KEYSTORE, DEFAULT_KEYSTORE);
|
||||||
|
properties.setProperty(NiFiProperties.SECURITY_KEYSTORE_TYPE, DEFAULT_STORE_TYPE);
|
||||||
|
properties.setProperty(NiFiProperties.SECURITY_TRUSTSTORE, DEFAULT_TRUSTSTORE);
|
||||||
|
properties.setProperty(NiFiProperties.SECURITY_TRUSTSTORE_TYPE, DEFAULT_STORE_TYPE);
|
||||||
|
|
||||||
|
final ZooKeeperClientConfig zkClientConfig = ZooKeeperClientConfig.createConfig(new StandardNiFiProperties(properties));
|
||||||
|
assertEquals(DEFAULT_KEYSTORE, zkClientConfig.getKeyStore());
|
||||||
|
assertEquals(DEFAULT_TRUSTSTORE, zkClientConfig.getTrustStore());
|
||||||
|
assertEquals(DEFAULT_STORE_TYPE, zkClientConfig.getKeyStoreType());
|
||||||
|
assertEquals(DEFAULT_STORE_TYPE, zkClientConfig.getTrustStoreType());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,283 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.controller.state.providers.zookeeper;
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.curator.test.InstanceSpec;
|
||||||
|
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
|
||||||
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.PropertyValue;
|
||||||
|
import org.apache.nifi.components.state.StateProvider;
|
||||||
|
import org.apache.nifi.components.state.StateProviderInitializationContext;
|
||||||
|
import org.apache.nifi.components.state.exception.StateTooLargeException;
|
||||||
|
import org.apache.nifi.controller.state.providers.AbstractTestStateProvider;
|
||||||
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
|
import org.apache.nifi.mock.MockComponentLogger;
|
||||||
|
import org.apache.nifi.parameter.ParameterLookup;
|
||||||
|
import org.apache.nifi.util.NiFiProperties;
|
||||||
|
import org.apache.zookeeper.server.ServerCnxnFactory;
|
||||||
|
import org.apache.zookeeper.server.ZooKeeperServer;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.testng.Assert;
|
||||||
|
|
||||||
|
import javax.net.ssl.SSLContext;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.apache.nifi.leader.election.TestSecureClientZooKeeperFactory.createAndStartServer;
|
||||||
|
import static org.apache.nifi.leader.election.TestSecureClientZooKeeperFactory.createClientProperties;
|
||||||
|
|
||||||
|
public class ITZooKeeperStateProvider extends AbstractTestStateProvider {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(ITZooKeeperStateProvider.class);
|
||||||
|
|
||||||
|
private volatile StateProvider provider;
|
||||||
|
private volatile ZooKeeperServer zkServer;
|
||||||
|
private static Map<PropertyDescriptor, String> stateProviderProperties = new HashMap<>();
|
||||||
|
private static Path tempDir;
|
||||||
|
private static Path dataDir;
|
||||||
|
private static int clientPort;
|
||||||
|
private static ServerCnxnFactory serverConnectionFactory;
|
||||||
|
private static NiFiProperties nifiProperties;
|
||||||
|
|
||||||
|
private static final String CLIENT_KEYSTORE = "src/test/resources/localhost-ks.jks";
|
||||||
|
private static final String CLIENT_TRUSTSTORE = "src/test/resources/localhost-ts.jks";
|
||||||
|
private static final String CLIENT_KEYSTORE_TYPE = "JKS";
|
||||||
|
private static final String CLIENT_TRUSTSTORE_TYPE = "JKS";
|
||||||
|
private static final String SERVER_KEYSTORE = "src/test/resources/localhost-ks.jks";
|
||||||
|
private static final String SERVER_TRUSTSTORE = "src/test/resources/localhost-ts.jks";
|
||||||
|
private static final String KEYSTORE_PASSWORD = "OI7kMpWzzVNVx/JGhTL/0uO4+PWpGJ46uZ/pfepbkwI";
|
||||||
|
private static final String TRUSTSTORE_PASSWORD = "wAOR0nQJ2EXvOP0JZ2EaqA/n7W69ILS4sWAHghmIWCc";
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
tempDir = Paths.get("target/TestZooKeeperStateProvider");
|
||||||
|
dataDir = tempDir.resolve("state");
|
||||||
|
clientPort = InstanceSpec.getRandomPort();
|
||||||
|
|
||||||
|
Files.createDirectory(tempDir);
|
||||||
|
|
||||||
|
// Set up the testing server
|
||||||
|
serverConnectionFactory = createAndStartServer(
|
||||||
|
dataDir,
|
||||||
|
tempDir,
|
||||||
|
clientPort,
|
||||||
|
Paths.get(SERVER_KEYSTORE),
|
||||||
|
KEYSTORE_PASSWORD,
|
||||||
|
Paths.get(SERVER_TRUSTSTORE),
|
||||||
|
TRUSTSTORE_PASSWORD
|
||||||
|
);
|
||||||
|
zkServer = serverConnectionFactory.getZooKeeperServer();
|
||||||
|
|
||||||
|
// Set up state provider (client) TLS properties, normally injected through StateProviderContext annotation
|
||||||
|
nifiProperties = createClientProperties(
|
||||||
|
clientPort,
|
||||||
|
Paths.get(CLIENT_KEYSTORE),
|
||||||
|
CLIENT_KEYSTORE_TYPE,
|
||||||
|
KEYSTORE_PASSWORD,
|
||||||
|
Paths.get(CLIENT_TRUSTSTORE),
|
||||||
|
CLIENT_TRUSTSTORE_TYPE,
|
||||||
|
TRUSTSTORE_PASSWORD
|
||||||
|
);
|
||||||
|
|
||||||
|
// Set up state provider properties
|
||||||
|
stateProviderProperties.put(ZooKeeperStateProvider.SESSION_TIMEOUT, "15 secs");
|
||||||
|
stateProviderProperties.put(ZooKeeperStateProvider.ROOT_NODE, "/nifi/team1/testing");
|
||||||
|
stateProviderProperties.put(ZooKeeperStateProvider.ACCESS_CONTROL, ZooKeeperStateProvider.OPEN_TO_WORLD.getValue());
|
||||||
|
final Map<PropertyDescriptor, String> properties = new HashMap<>(stateProviderProperties);
|
||||||
|
properties.put(ZooKeeperStateProvider.CONNECTION_STRING, "localhost:".concat(String.valueOf(clientPort)));
|
||||||
|
this.provider = createProvider(properties);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initializeProvider(final ZooKeeperStateProvider provider, final Map<PropertyDescriptor, String> properties) throws IOException {
|
||||||
|
provider.setNiFiProperties(nifiProperties);
|
||||||
|
provider.initialize(new StateProviderInitializationContext() {
|
||||||
|
@Override
|
||||||
|
public String getIdentifier() {
|
||||||
|
return "Unit Test Provider Initialization Context";
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<PropertyDescriptor, PropertyValue> getProperties() {
|
||||||
|
final Map<PropertyDescriptor, PropertyValue> propValueMap = new HashMap<>();
|
||||||
|
for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
|
||||||
|
propValueMap.put(entry.getKey(), new StandardPropertyValue(entry.getValue(), null, ParameterLookup.EMPTY));
|
||||||
|
}
|
||||||
|
return propValueMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String,String> getAllProperties() {
|
||||||
|
final Map<String,String> propValueMap = new LinkedHashMap<>();
|
||||||
|
for (final Map.Entry<PropertyDescriptor, PropertyValue> entry : getProperties().entrySet()) {
|
||||||
|
propValueMap.put(entry.getKey().getName(), entry.getValue().getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
propValueMap.put(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, Boolean.TRUE.toString());
|
||||||
|
propValueMap.put(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE, CLIENT_KEYSTORE);
|
||||||
|
propValueMap.put(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_PASSWD, KEYSTORE_PASSWORD);
|
||||||
|
propValueMap.put(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, CLIENT_KEYSTORE_TYPE);
|
||||||
|
propValueMap.put(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE, CLIENT_TRUSTSTORE);
|
||||||
|
propValueMap.put(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_PASSWD, TRUSTSTORE_PASSWORD);
|
||||||
|
propValueMap.put(NiFiProperties.ZOOKEEPER_SECURITY_TRUSTSTORE_TYPE, CLIENT_TRUSTSTORE_TYPE);
|
||||||
|
|
||||||
|
return propValueMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PropertyValue getProperty(final PropertyDescriptor property) {
|
||||||
|
final String prop = properties.get(property);
|
||||||
|
return new StandardPropertyValue(prop, null, ParameterLookup.EMPTY);
|
||||||
|
}
|
||||||
|
|
||||||
|
// This won't be used by the ZooKeeper State Provider. I don't believe there's a way to pass an SSLContext
|
||||||
|
// directly to ZooKeeper anyway.
|
||||||
|
@Override
|
||||||
|
public SSLContext getSSLContext() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ComponentLog getLogger() {
|
||||||
|
return new MockComponentLogger();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private ZooKeeperStateProvider createProvider(final Map<PropertyDescriptor, String> properties) throws Exception {
|
||||||
|
final ZooKeeperStateProvider provider = new ZooKeeperStateProvider();
|
||||||
|
initializeProvider(provider, properties);
|
||||||
|
provider.enable();
|
||||||
|
return provider;
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void clear() throws IOException {
|
||||||
|
try {
|
||||||
|
if (provider != null) {
|
||||||
|
provider.onComponentRemoved(componentId);
|
||||||
|
provider.disable();
|
||||||
|
provider.shutdown();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (zkServer != null) {
|
||||||
|
zkServer.shutdown(true);
|
||||||
|
clearDirectories();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void clearDirectories() {
|
||||||
|
try {
|
||||||
|
FileUtils.deleteDirectory(new File(tempDir.toString()));
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.error("Failed to delete: " + tempDir.toString(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected StateProvider getProvider() {
|
||||||
|
return provider;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testStateTooLargeExceptionThrownOnSetState() throws InterruptedException {
|
||||||
|
final Map<String, String> state = new HashMap<>();
|
||||||
|
final StringBuilder sb = new StringBuilder();
|
||||||
|
|
||||||
|
// Build a string that is a little less than 64 KB, because that's
|
||||||
|
// the largest value available for DataOutputStream.writeUTF
|
||||||
|
for (int i = 0; i < 6500; i++) {
|
||||||
|
sb.append("0123456789");
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < 20; i++) {
|
||||||
|
state.put("numbers." + i, sb.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
getProvider().setState(state, componentId);
|
||||||
|
Assert.fail("Expected StateTooLargeException");
|
||||||
|
} catch (final StateTooLargeException stle) {
|
||||||
|
// expected behavior.
|
||||||
|
break;
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
// If we attempt to interact with the server too quickly, we will get a
|
||||||
|
// ZooKeeper ConnectionLoss Exception, which the provider wraps in an IOException.
|
||||||
|
// We will wait 1 second in this case and try again. The test will timeout if this
|
||||||
|
// does not succeeed within 30 seconds.
|
||||||
|
Thread.sleep(1000L);
|
||||||
|
} catch (final Exception e) {
|
||||||
|
logger.error("Something went wrong attempting to set the state in testStateTooLargeExceptionThrownOnSetState()");
|
||||||
|
Assert.fail("Expected StateTooLargeException but " + e.getClass() + " was thrown", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testStateTooLargeExceptionThrownOnReplace() throws IOException, InterruptedException {
|
||||||
|
final Map<String, String> state = new HashMap<>();
|
||||||
|
final StringBuilder sb = new StringBuilder();
|
||||||
|
|
||||||
|
// Build a string that is a little less than 64 KB, because that's
|
||||||
|
// the largest value available for DataOutputStream.writeUTF
|
||||||
|
for (int i = 0; i < 6500; i++) {
|
||||||
|
sb.append("0123456789");
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < 20; i++) {
|
||||||
|
state.put("numbers." + i, sb.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
final Map<String, String> smallState = new HashMap<>();
|
||||||
|
smallState.put("abc", "xyz");
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
getProvider().setState(smallState, componentId);
|
||||||
|
break;
|
||||||
|
} catch (final IOException ioe) {
|
||||||
|
// If we attempt to interact with the server too quickly, we will get a
|
||||||
|
// ZooKeeper ConnectionLoss Exception, which the provider wraps in an IOException.
|
||||||
|
// We will wait 1 second in this case and try again. The test will timeout if this
|
||||||
|
// does not succeeed within 30 seconds.
|
||||||
|
Thread.sleep(1000L);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
getProvider().replace(getProvider().getState(componentId), state, componentId);
|
||||||
|
Assert.fail("Expected StateTooLargeException");
|
||||||
|
} catch (final StateTooLargeException stle) {
|
||||||
|
// expected behavior.
|
||||||
|
} catch (final Exception e) {
|
||||||
|
logger.error("Something went wrong in attempting to get the state in testStateTooLargeExceptionThrownOnReplace()");
|
||||||
|
Assert.fail("Expected StateTooLargeException", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -26,6 +26,7 @@ import org.apache.nifi.components.state.StateProviderInitializationContext;
|
||||||
import org.apache.nifi.components.state.exception.StateTooLargeException;
|
import org.apache.nifi.components.state.exception.StateTooLargeException;
|
||||||
import org.apache.nifi.controller.state.providers.AbstractTestStateProvider;
|
import org.apache.nifi.controller.state.providers.AbstractTestStateProvider;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
|
import org.apache.nifi.util.NiFiProperties;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -36,6 +37,9 @@ import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
public class TestZooKeeperStateProvider extends AbstractTestStateProvider {
|
public class TestZooKeeperStateProvider extends AbstractTestStateProvider {
|
||||||
|
|
||||||
|
@ -43,6 +47,13 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider {
|
||||||
private volatile TestingServer zkServer;
|
private volatile TestingServer zkServer;
|
||||||
|
|
||||||
private static final Map<PropertyDescriptor, String> defaultProperties = new HashMap<>();
|
private static final Map<PropertyDescriptor, String> defaultProperties = new HashMap<>();
|
||||||
|
private static NiFiProperties nifiProperties;
|
||||||
|
private static final String KEYSTORE = "/a/keyStore.jks";
|
||||||
|
private static final String KEYSTORE_PASSWORD = "aKeystorePassword";
|
||||||
|
private static final String KEYSTORE_TYPE = "JKS";
|
||||||
|
private static final String TRUSTSTORE = "/a/trustStore.jks";
|
||||||
|
private static final String TRUSTSTORE_PASSWORD = "aTruststorePassword";
|
||||||
|
private static final String TRUSTSTORE_TYPE = "JKS";
|
||||||
|
|
||||||
static {
|
static {
|
||||||
defaultProperties.put(ZooKeeperStateProvider.SESSION_TIMEOUT, "15 secs");
|
defaultProperties.put(ZooKeeperStateProvider.SESSION_TIMEOUT, "15 secs");
|
||||||
|
@ -105,11 +116,22 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider {
|
||||||
|
|
||||||
private ZooKeeperStateProvider createProvider(final Map<PropertyDescriptor, String> properties) throws Exception {
|
private ZooKeeperStateProvider createProvider(final Map<PropertyDescriptor, String> properties) throws Exception {
|
||||||
final ZooKeeperStateProvider provider = new ZooKeeperStateProvider();
|
final ZooKeeperStateProvider provider = new ZooKeeperStateProvider();
|
||||||
|
nifiProperties = createTestNiFiProperties();
|
||||||
|
provider.setNiFiProperties(nifiProperties);
|
||||||
initializeProvider(provider, properties);
|
initializeProvider(provider, properties);
|
||||||
provider.enable();
|
provider.enable();
|
||||||
return provider;
|
return provider;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private NiFiProperties createTestNiFiProperties() {
|
||||||
|
Properties keystoreProps = new Properties();
|
||||||
|
keystoreProps.setProperty(NiFiProperties.SECURITY_KEYSTORE, KEYSTORE);
|
||||||
|
keystoreProps.setProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD, KEYSTORE_PASSWORD);
|
||||||
|
keystoreProps.setProperty(NiFiProperties.SECURITY_KEYSTORE_TYPE, KEYSTORE_TYPE);
|
||||||
|
|
||||||
|
return NiFiProperties.createBasicNiFiProperties(null, keystoreProps);
|
||||||
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void clear() throws IOException {
|
public void clear() throws IOException {
|
||||||
try {
|
try {
|
||||||
|
@ -160,7 +182,6 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider {
|
||||||
// does not succeeed within 30 seconds.
|
// does not succeeed within 30 seconds.
|
||||||
Thread.sleep(1000L);
|
Thread.sleep(1000L);
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
e.printStackTrace();
|
|
||||||
Assert.fail("Expected StateTooLargeException but " + e.getClass() + " was thrown", e);
|
Assert.fail("Expected StateTooLargeException but " + e.getClass() + " was thrown", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -203,9 +224,48 @@ public class TestZooKeeperStateProvider extends AbstractTestStateProvider {
|
||||||
} catch (final StateTooLargeException stle) {
|
} catch (final StateTooLargeException stle) {
|
||||||
// expected behavior.
|
// expected behavior.
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
e.printStackTrace();
|
|
||||||
Assert.fail("Expected StateTooLargeException", e);
|
Assert.fail("Expected StateTooLargeException", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCombineProperties() {
|
||||||
|
Properties truststoreProps = new Properties();
|
||||||
|
truststoreProps.setProperty(NiFiProperties.SECURITY_TRUSTSTORE, TRUSTSTORE);
|
||||||
|
truststoreProps.setProperty(NiFiProperties.SECURITY_TRUSTSTORE_PASSWD, TRUSTSTORE_PASSWORD);
|
||||||
|
truststoreProps.setProperty(NiFiProperties.SECURITY_TRUSTSTORE_TYPE, TRUSTSTORE_TYPE);
|
||||||
|
|
||||||
|
NiFiProperties combinedProperties = ZooKeeperStateProvider.combineProperties(nifiProperties, truststoreProps);
|
||||||
|
assertEquals(KEYSTORE, combinedProperties.getProperty(NiFiProperties.SECURITY_KEYSTORE));
|
||||||
|
assertEquals(TRUSTSTORE, combinedProperties.getProperty(NiFiProperties.SECURITY_TRUSTSTORE));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCombinePropertiesOverridesWithAdditionalProperties() {
|
||||||
|
final String OVERRIDE_KEYSTORE = "/override/keystore.jks";
|
||||||
|
final String OVERRIDE_KEYSTORE_PASSWORD = "overridePassword";
|
||||||
|
|
||||||
|
Properties overrideProps = new Properties();
|
||||||
|
overrideProps.setProperty(NiFiProperties.SECURITY_KEYSTORE, OVERRIDE_KEYSTORE);
|
||||||
|
overrideProps.setProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD, OVERRIDE_KEYSTORE_PASSWORD);
|
||||||
|
|
||||||
|
NiFiProperties combinedProperties = ZooKeeperStateProvider.combineProperties(nifiProperties, overrideProps);
|
||||||
|
assertEquals(OVERRIDE_KEYSTORE, combinedProperties.getProperty(NiFiProperties.SECURITY_KEYSTORE));
|
||||||
|
assertEquals(OVERRIDE_KEYSTORE_PASSWORD, combinedProperties.getProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCombineNullNiFiPropsWithPropertiesOverrides() {
|
||||||
|
final String OVERRIDE_KEYSTORE = "/override/keystore.jks";
|
||||||
|
final String OVERRIDE_KEYSTORE_PASSWORD = "overridePassword";
|
||||||
|
|
||||||
|
Properties overrideProps = new Properties();
|
||||||
|
overrideProps.setProperty(NiFiProperties.SECURITY_KEYSTORE, OVERRIDE_KEYSTORE);
|
||||||
|
overrideProps.setProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD, OVERRIDE_KEYSTORE_PASSWORD);
|
||||||
|
|
||||||
|
NiFiProperties combinedProperties = ZooKeeperStateProvider.combineProperties(null, overrideProps);
|
||||||
|
assertEquals(OVERRIDE_KEYSTORE, combinedProperties.getProperty(NiFiProperties.SECURITY_KEYSTORE));
|
||||||
|
assertEquals(OVERRIDE_KEYSTORE_PASSWORD, combinedProperties.getProperty(NiFiProperties.SECURITY_KEYSTORE_PASSWD));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||||
|
|
||||||
import org.apache.nifi.util.NiFiProperties;
|
import org.apache.nifi.util.NiFiProperties;
|
||||||
import org.apache.nifi.properties.StandardNiFiProperties;
|
import org.apache.nifi.properties.StandardNiFiProperties;
|
||||||
import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
|
|
||||||
|
|
||||||
import org.apache.zookeeper.client.FourLetterWordMain;
|
import org.apache.zookeeper.client.FourLetterWordMain;
|
||||||
import org.apache.zookeeper.common.X509Exception.SSLContextException;
|
import org.apache.zookeeper.common.X509Exception.SSLContextException;
|
||||||
|
@ -72,7 +71,7 @@ public class TestZooKeeperStateServer {
|
||||||
|
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.setProperty(NiFiProperties.STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES, zkServerConfig.toString());
|
properties.setProperty(NiFiProperties.STATE_MANAGEMENT_ZOOKEEPER_PROPERTIES, zkServerConfig.toString());
|
||||||
properties.setProperty(NiFiProperties.STATE_MANAGEMENT_START_EMBEDDED_ZOOKEEPER, "true");
|
properties.setProperty(NiFiProperties.STATE_MANAGEMENT_START_EMBEDDED_ZOOKEEPER, Boolean.TRUE.toString());
|
||||||
|
|
||||||
zkServer = ZooKeeperStateServer.create(new StandardNiFiProperties(properties));
|
zkServer = ZooKeeperStateServer.create(new StandardNiFiProperties(properties));
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||||
import org.apache.nifi.util.NiFiProperties;
|
import org.apache.nifi.util.NiFiProperties;
|
||||||
import org.apache.nifi.properties.StandardNiFiProperties;
|
import org.apache.nifi.properties.StandardNiFiProperties;
|
||||||
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
|
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
|
||||||
import org.apache.nifi.controller.leader.election.CuratorLeaderElectionManager.SecureClientZooKeeperFactory;
|
import org.apache.nifi.controller.cluster.SecureClientZooKeeperFactory;
|
||||||
import org.apache.nifi.security.util.CertificateUtils;
|
import org.apache.nifi.security.util.CertificateUtils;
|
||||||
|
|
||||||
import org.apache.zookeeper.common.ClientX509Util;
|
import org.apache.zookeeper.common.ClientX509Util;
|
||||||
|
@ -36,9 +36,6 @@ import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.testng.Assert.assertEquals;
|
|
||||||
import static org.testng.Assert.assertNotNull;
|
|
||||||
|
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
@ -58,6 +55,9 @@ import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
public class TestSecureClientZooKeeperFactory {
|
public class TestSecureClientZooKeeperFactory {
|
||||||
|
|
||||||
private static final String NETTY_SERVER_CNXN_FACTORY =
|
private static final String NETTY_SERVER_CNXN_FACTORY =
|
||||||
|
@ -182,10 +182,10 @@ public class TestSecureClientZooKeeperFactory {
|
||||||
assertNotNull(checkExistsResult);
|
assertNotNull(checkExistsResult);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ServerCnxnFactory createAndStartServer(final Path dataDir,
|
public static ServerCnxnFactory createAndStartServer(final Path dataDir,
|
||||||
final Path tempDir, final int clientPort, final Path keyStore,
|
final Path tempDir, final int clientPort, final Path keyStore,
|
||||||
final String keyStorePassword, final Path trustStore,
|
final String keyStorePassword, final Path trustStore,
|
||||||
final String trustStorePassword) throws IOException, InterruptedException {
|
final String trustStorePassword) throws IOException, InterruptedException {
|
||||||
|
|
||||||
final ClientX509Util x509Util = new ClientX509Util();
|
final ClientX509Util x509Util = new ClientX509Util();
|
||||||
System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, NETTY_SERVER_CNXN_FACTORY);
|
System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, NETTY_SERVER_CNXN_FACTORY);
|
||||||
|
@ -204,13 +204,13 @@ public class TestSecureClientZooKeeperFactory {
|
||||||
return secureConnectionFactory;
|
return secureConnectionFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static NiFiProperties createClientProperties(final int clientPort,
|
public static NiFiProperties createClientProperties(final int clientPort,
|
||||||
final Path keyStore, final String keyStoreType, final String keyStorePassword,
|
final Path keyStore, final String keyStoreType, final String keyStorePassword,
|
||||||
final Path trustStore, final String trustStoreType, final String trustStorePassword) {
|
final Path trustStore, final String trustStoreType, final String trustStorePassword) {
|
||||||
|
|
||||||
final Properties properties = new Properties();
|
final Properties properties = new Properties();
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, String.format("localhost:%d", clientPort));
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CONNECT_STRING, String.format("localhost:%d", clientPort));
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, "true");
|
properties.setProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE, Boolean.TRUE.toString());
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE, keyStore.toString());
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE, keyStore.toString());
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, keyStoreType);
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, keyStoreType);
|
||||||
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_PASSWD, keyStorePassword);
|
properties.setProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_PASSWD, keyStorePassword);
|
||||||
|
@ -221,38 +221,38 @@ public class TestSecureClientZooKeeperFactory {
|
||||||
return new StandardNiFiProperties(properties);
|
return new StandardNiFiProperties(properties);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static X509Certificate createKeyStore(final String alias,
|
public static X509Certificate createKeyStore(final String alias,
|
||||||
final String password, final Path path, final String keyStoreType)
|
final String password, final Path path, final String keyStoreType)
|
||||||
throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
|
throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
|
||||||
|
|
||||||
try (final FileOutputStream outputStream = new FileOutputStream(path.toFile())) {
|
try (final FileOutputStream outputStream = new FileOutputStream(path.toFile())) {
|
||||||
final KeyPair keyPair = KeyPairGenerator.getInstance("RSA").generateKeyPair();
|
final KeyPair keyPair = KeyPairGenerator.getInstance("RSA").generateKeyPair();
|
||||||
|
|
||||||
final X509Certificate selfSignedCert = CertificateUtils.generateSelfSignedX509Certificate(
|
final X509Certificate selfSignedCert = CertificateUtils.generateSelfSignedX509Certificate(
|
||||||
keyPair, "CN=localhost", "SHA256withRSA", 365
|
keyPair, "CN=localhost", "SHA256withRSA", 365
|
||||||
);
|
);
|
||||||
|
|
||||||
final char[] passwordChars = password.toCharArray();
|
final char[] passwordChars = password.toCharArray();
|
||||||
final KeyStore keyStore = KeyStore.getInstance(keyStoreType);
|
final KeyStore keyStore = KeyStore.getInstance(keyStoreType);
|
||||||
keyStore.load(null, null);
|
keyStore.load(null, null);
|
||||||
keyStore.setKeyEntry(alias, keyPair.getPrivate(), passwordChars,
|
keyStore.setKeyEntry(alias, keyPair.getPrivate(), passwordChars,
|
||||||
new Certificate[]{selfSignedCert});
|
new Certificate[]{selfSignedCert});
|
||||||
keyStore.store(outputStream, passwordChars);
|
keyStore.store(outputStream, passwordChars);
|
||||||
|
|
||||||
return selfSignedCert;
|
return selfSignedCert;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void createTrustStore(final X509Certificate cert,
|
public static void createTrustStore(final X509Certificate cert,
|
||||||
final String alias, final String password, final Path path, final String keyStoreType)
|
final String alias, final String password, final Path path, final String keyStoreType)
|
||||||
throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
|
throws IOException, KeyStoreException, NoSuchAlgorithmException, CertificateException {
|
||||||
|
|
||||||
try (final FileOutputStream outputStream = new FileOutputStream(path.toFile())) {
|
try (final FileOutputStream outputStream = new FileOutputStream(path.toFile())) {
|
||||||
final KeyStore trustStore = KeyStore.getInstance(keyStoreType);
|
final KeyStore trustStore = KeyStore.getInstance(keyStoreType);
|
||||||
trustStore.load(null, null);
|
trustStore.load(null, null);
|
||||||
trustStore.setCertificateEntry(alias, cert);
|
trustStore.setCertificateEntry(alias, cert);
|
||||||
trustStore.store(outputStream, password.toCharArray());
|
trustStore.store(outputStream, password.toCharArray());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
Loading…
Reference in New Issue