NIFI-10481 Support disabling ZooKeeper Ensemble Tracking

This closes #6400

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
nathluu 2022-09-12 07:19:47 +07:00 committed by exceptionfactory
parent 4f77a17d19
commit ea93dec079
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
4 changed files with 26 additions and 3 deletions

View File

@ -277,6 +277,7 @@ public class NiFiProperties extends ApplicationProperties {
public static final String ZOOKEEPER_CONNECT_TIMEOUT = "nifi.zookeeper.connect.timeout";
public static final String ZOOKEEPER_SESSION_TIMEOUT = "nifi.zookeeper.session.timeout";
public static final String ZOOKEEPER_ROOT_NODE = "nifi.zookeeper.root.node";
public static final String ZOOKEEPER_CLIENT_ENSEMBLE_TRACKER = "nifi.zookeeper.client.ensembleTracker";
public static final String ZOOKEEPER_CLIENT_SECURE = "nifi.zookeeper.client.secure";
public static final String ZOOKEEPER_SECURITY_KEYSTORE = "nifi.zookeeper.security.keystore";
public static final String ZOOKEEPER_SECURITY_KEYSTORE_TYPE = "nifi.zookeeper.security.keystoreType";
@ -369,6 +370,7 @@ public class NiFiProperties extends ApplicationProperties {
public static final String DEFAULT_ZOOKEEPER_SESSION_TIMEOUT = "3 secs";
public static final String DEFAULT_ZOOKEEPER_ROOT_NODE = "/nifi";
public static final boolean DEFAULT_ZOOKEEPER_CLIENT_SECURE = false;
public static final boolean DEFAULT_ZOOKEEPER_CLIENT_ENSEMBLE_TRACKER = true;
public static final String DEFAULT_ZOOKEEPER_AUTH_TYPE = "default";
public static final String DEFAULT_ZOOKEEPER_KERBEROS_REMOVE_HOST_FROM_PRINCIPAL = "true";
public static final String DEFAULT_ZOOKEEPER_KERBEROS_REMOVE_REALM_FROM_PRINCIPAL = "true";
@ -1719,6 +1721,17 @@ public class NiFiProperties extends ApplicationProperties {
return Boolean.parseBoolean(clientSecure);
}
public boolean isZookeeperClientWithEnsembleTracker() {
final String defaultValue = String.valueOf(DEFAULT_ZOOKEEPER_CLIENT_ENSEMBLE_TRACKER);
final String withEnsembleTracker = getProperty(ZOOKEEPER_CLIENT_ENSEMBLE_TRACKER, defaultValue).trim();
if (!"true".equalsIgnoreCase(withEnsembleTracker) && !"false".equalsIgnoreCase(withEnsembleTracker)) {
throw new RuntimeException(String.format("%s was '%s', expected true or false", NiFiProperties.ZOOKEEPER_CLIENT_ENSEMBLE_TRACKER, withEnsembleTracker));
}
return Boolean.parseBoolean(withEnsembleTracker);
}
public boolean isZooKeeperTlsConfigurationPresent() {
return StringUtils.isNotBlank(getProperty(NiFiProperties.ZOOKEEPER_CLIENT_SECURE))
&& StringUtils.isNotBlank(getProperty(NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE))

View File

@ -2869,7 +2869,8 @@ in the _$NIFI_HOME/conf/nifi.properties_ file:
[options="header,footer"]
|===
|Property Name|Description|Default
|`nifi.zookeeper.client.secure`|Whether to acccess ZooKeeper using client TLS.|false
|`nifi.zookeeper.client.ensembleTracker`|Whether to enable ZooKeeper client Ensemble Tracking.|true
|`nifi.zookeeper.client.secure`|Whether to access ZooKeeper using client TLS.|false
|`nifi.zookeeper.security.keystore`|Filename of the Keystore containing the private key to use when communicating with ZooKeeper.|_none_
|`nifi.zookeeper.security.keystoreType`|Optional. The type of the Keystore. Must be `PKCS12`, `JKS`, or `PEM`. If not specified the type will be determined from the file extension (`.p12`, `.jks`, `.pem`).|_none_
|`nifi.zookeeper.security.keystorePasswd`|The password for the Keystore.|_none_

View File

@ -44,6 +44,7 @@ public class ZooKeeperClientConfig {
private final int connectionTimeoutMillis;
private final String rootPath;
private final boolean clientSecure;
private final boolean withEnsembleTracker;
private final String keyStore;
private final String keyStoreType;
private final String keyStorePassword;
@ -60,7 +61,7 @@ public class ZooKeeperClientConfig {
String rootPath, String authType, String authPrincipal, String removeHostFromPrincipal,
String removeRealmFromPrincipal, boolean clientSecure, String keyStore, String keyStoreType,
String keyStorePassword, String trustStore, String trustStoreType, String trustStorePassword,
final int juteMaxbuffer) {
final int juteMaxbuffer, boolean withEnsembleTracker) {
this.connectString = connectString;
this.sessionTimeoutMillis = sessionTimeoutMillis;
this.connectionTimeoutMillis = connectionTimeoutMillis;
@ -77,6 +78,7 @@ public class ZooKeeperClientConfig {
this.removeHostFromPrincipal = removeHostFromPrincipal;
this.removeRealmFromPrincipal = removeRealmFromPrincipal;
this.juteMaxbuffer = juteMaxbuffer;
this.withEnsembleTracker = withEnsembleTracker;
}
public String getConnectString() {
@ -99,6 +101,10 @@ public class ZooKeeperClientConfig {
return clientSecure;
}
public boolean isWithEnsembleTracker() {
return withEnsembleTracker;
}
public String getConnectionSocket() {
return (isClientSecure() ? NETTY_CLIENT_CNXN_SOCKET : NIO_CLIENT_CNXN_SOCKET);
}
@ -169,6 +175,7 @@ public class ZooKeeperClientConfig {
final long connectionTimeoutMs = getTimePeriod(nifiProperties, NiFiProperties.ZOOKEEPER_CONNECT_TIMEOUT, NiFiProperties.DEFAULT_ZOOKEEPER_CONNECT_TIMEOUT);
final String rootPath = nifiProperties.getProperty(NiFiProperties.ZOOKEEPER_ROOT_NODE, NiFiProperties.DEFAULT_ZOOKEEPER_ROOT_NODE);
final boolean clientSecure = nifiProperties.isZooKeeperClientSecure();
final boolean withEnsembleTracker = nifiProperties.isZookeeperClientWithEnsembleTracker();
final String keyStore = getPreferredProperty(nifiProperties, NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE, NiFiProperties.SECURITY_KEYSTORE);
final String keyStoreType = StringUtils.stripToNull(getPreferredProperty(nifiProperties, NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_TYPE, NiFiProperties.SECURITY_KEYSTORE_TYPE));
final String keyStorePassword = getPreferredProperty(nifiProperties, NiFiProperties.ZOOKEEPER_SECURITY_KEYSTORE_PASSWD, NiFiProperties.SECURITY_KEYSTORE_PASSWD);
@ -205,7 +212,8 @@ public class ZooKeeperClientConfig {
trustStore,
trustStoreType,
trustStorePassword,
juteMaxbuffer
juteMaxbuffer,
withEnsembleTracker
);
}

View File

@ -427,6 +427,7 @@ public class CuratorLeaderElectionManager implements LeaderElectionManager {
.connectString(zkConfig.getConnectString())
.sessionTimeoutMs(zkConfig.getSessionTimeoutMillis())
.connectionTimeoutMs(zkConfig.getConnectionTimeoutMillis())
.ensembleTracker(zkConfig.isWithEnsembleTracker())
.retryPolicy(retryPolicy)
.aclProvider(aclProviderFactory.create(zkConfig))
.defaultData(new byte[0]);