NIFI-5233 - Add EL support with Variable Registry scope for HBase client service

This closes #2738

Signed-off-by: Mike Thomsen <mikerthomsen@gmail.com>
This commit is contained in:
Pierre Villard 2018-05-24 16:42:27 +02:00 committed by Mike Thomsen
parent cf6089196f
commit 64cd34016b
3 changed files with 23 additions and 13 deletions

View File

@ -42,24 +42,28 @@ public interface HBaseClientService extends ControllerService {
" such as hbase-site.xml and core-site.xml for kerberos, " + " such as hbase-site.xml and core-site.xml for kerberos, " +
"including full paths to the files.") "including full paths to the files.")
.addValidator(new ConfigFilesValidator()) .addValidator(new ConfigFilesValidator())
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build(); .build();
PropertyDescriptor ZOOKEEPER_QUORUM = new PropertyDescriptor.Builder() PropertyDescriptor ZOOKEEPER_QUORUM = new PropertyDescriptor.Builder()
.name("ZooKeeper Quorum") .name("ZooKeeper Quorum")
.description("Comma-separated list of ZooKeeper hosts for HBase. Required if Hadoop Configuration Files are not provided.") .description("Comma-separated list of ZooKeeper hosts for HBase. Required if Hadoop Configuration Files are not provided.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build(); .build();
PropertyDescriptor ZOOKEEPER_CLIENT_PORT = new PropertyDescriptor.Builder() PropertyDescriptor ZOOKEEPER_CLIENT_PORT = new PropertyDescriptor.Builder()
.name("ZooKeeper Client Port") .name("ZooKeeper Client Port")
.description("The port on which ZooKeeper is accepting client connections. Required if Hadoop Configuration Files are not provided.") .description("The port on which ZooKeeper is accepting client connections. Required if Hadoop Configuration Files are not provided.")
.addValidator(StandardValidators.PORT_VALIDATOR) .addValidator(StandardValidators.PORT_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build(); .build();
PropertyDescriptor ZOOKEEPER_ZNODE_PARENT = new PropertyDescriptor.Builder() PropertyDescriptor ZOOKEEPER_ZNODE_PARENT = new PropertyDescriptor.Builder()
.name("ZooKeeper ZNode Parent") .name("ZooKeeper ZNode Parent")
.description("The ZooKeeper ZNode Parent value for HBase (example: /hbase). Required if Hadoop Configuration Files are not provided.") .description("The ZooKeeper ZNode Parent value for HBase (example: /hbase). Required if Hadoop Configuration Files are not provided.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build(); .build();
PropertyDescriptor HBASE_CLIENT_RETRIES = new PropertyDescriptor.Builder() PropertyDescriptor HBASE_CLIENT_RETRIES = new PropertyDescriptor.Builder()
@ -67,6 +71,7 @@ public interface HBaseClientService extends ControllerService {
.description("The number of times the HBase client will retry connecting. Required if Hadoop Configuration Files are not provided.") .description("The number of times the HBase client will retry connecting. Required if Hadoop Configuration Files are not provided.")
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1") .defaultValue("1")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build(); .build();
PropertyDescriptor PHOENIX_CLIENT_JAR_LOCATION = new PropertyDescriptor.Builder() PropertyDescriptor PHOENIX_CLIENT_JAR_LOCATION = new PropertyDescriptor.Builder()

View File

@ -199,7 +199,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
} }
if (confFileProvided) { if (confFileProvided) {
final String configFiles = validationContext.getProperty(HADOOP_CONF_FILES).getValue(); final String configFiles = validationContext.getProperty(HADOOP_CONF_FILES).evaluateAttributeExpressions().getValue();
ValidationResources resources = validationResourceHolder.get(); ValidationResources resources = validationResourceHolder.get();
// if no resources in the holder, or if the holder has different resources loaded, // if no resources in the holder, or if the holder has different resources loaded,
@ -268,21 +268,21 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
} }
protected Connection createConnection(final ConfigurationContext context) throws IOException, InterruptedException { protected Connection createConnection(final ConfigurationContext context) throws IOException, InterruptedException {
final String configFiles = context.getProperty(HADOOP_CONF_FILES).getValue(); final String configFiles = context.getProperty(HADOOP_CONF_FILES).evaluateAttributeExpressions().getValue();
final Configuration hbaseConfig = getConfigurationFromFiles(configFiles); final Configuration hbaseConfig = getConfigurationFromFiles(configFiles);
// override with any properties that are provided // override with any properties that are provided
if (context.getProperty(ZOOKEEPER_QUORUM).isSet()) { if (context.getProperty(ZOOKEEPER_QUORUM).isSet()) {
hbaseConfig.set(HBASE_CONF_ZK_QUORUM, context.getProperty(ZOOKEEPER_QUORUM).getValue()); hbaseConfig.set(HBASE_CONF_ZK_QUORUM, context.getProperty(ZOOKEEPER_QUORUM).evaluateAttributeExpressions().getValue());
} }
if (context.getProperty(ZOOKEEPER_CLIENT_PORT).isSet()) { if (context.getProperty(ZOOKEEPER_CLIENT_PORT).isSet()) {
hbaseConfig.set(HBASE_CONF_ZK_PORT, context.getProperty(ZOOKEEPER_CLIENT_PORT).getValue()); hbaseConfig.set(HBASE_CONF_ZK_PORT, context.getProperty(ZOOKEEPER_CLIENT_PORT).evaluateAttributeExpressions().getValue());
} }
if (context.getProperty(ZOOKEEPER_ZNODE_PARENT).isSet()) { if (context.getProperty(ZOOKEEPER_ZNODE_PARENT).isSet()) {
hbaseConfig.set(HBASE_CONF_ZNODE_PARENT, context.getProperty(ZOOKEEPER_ZNODE_PARENT).getValue()); hbaseConfig.set(HBASE_CONF_ZNODE_PARENT, context.getProperty(ZOOKEEPER_ZNODE_PARENT).evaluateAttributeExpressions().getValue());
} }
if (context.getProperty(HBASE_CLIENT_RETRIES).isSet()) { if (context.getProperty(HBASE_CLIENT_RETRIES).isSet()) {
hbaseConfig.set(HBASE_CONF_CLIENT_RETRIES, context.getProperty(HBASE_CLIENT_RETRIES).getValue()); hbaseConfig.set(HBASE_CONF_CLIENT_RETRIES, context.getProperty(HBASE_CLIENT_RETRIES).evaluateAttributeExpressions().getValue());
} }
// add any dynamic properties to the HBase configuration // add any dynamic properties to the HBase configuration

View File

@ -88,10 +88,15 @@ public class TestHBase_1_1_2_ClientService {
runner.assertNotValid(service); runner.assertNotValid(service);
runner.removeControllerService(service); runner.removeControllerService(service);
runner.setVariable("hadoop-conf-files", "src/test/resources/hbase-site.xml");
runner.setVariable("zk-quorum", "localhost");
runner.setVariable("zk-client-port", "2181");
runner.setVariable("zk-znode", "/hbase");
// conf file with no zk properties should be valid // conf file with no zk properties should be valid
service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile); service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
runner.addControllerService("hbaseClientService", service); runner.addControllerService("hbaseClientService", service);
runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/hbase-site.xml"); runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "${hadoop-conf-files}");
runner.enableControllerService(service); runner.enableControllerService(service);
runner.assertValid(service); runner.assertValid(service);
@ -100,7 +105,7 @@ public class TestHBase_1_1_2_ClientService {
// only quorum and no conf file should be invalid // only quorum and no conf file should be invalid
service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile); service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
runner.addControllerService("hbaseClientService", service); runner.addControllerService("hbaseClientService", service);
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost"); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "${zk-quorum}");
runner.enableControllerService(service); runner.enableControllerService(service);
runner.assertNotValid(service); runner.assertNotValid(service);
@ -109,8 +114,8 @@ public class TestHBase_1_1_2_ClientService {
// quorum and port, no znode, no conf file, should be invalid // quorum and port, no znode, no conf file, should be invalid
service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile); service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
runner.addControllerService("hbaseClientService", service); runner.addControllerService("hbaseClientService", service);
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost"); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "${zk-quorum}");
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181"); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "${zk-client-port}");
runner.enableControllerService(service); runner.enableControllerService(service);
runner.assertNotValid(service); runner.assertNotValid(service);
@ -119,9 +124,9 @@ public class TestHBase_1_1_2_ClientService {
// quorum, port, and znode, no conf file, should be valid // quorum, port, and znode, no conf file, should be valid
service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile); service = new MockHBaseClientService(table, COL_FAM, kerberosPropsWithFile);
runner.addControllerService("hbaseClientService", service); runner.addControllerService("hbaseClientService", service);
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost"); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "${zk-quorum}");
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181"); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "${zk-client-port}");
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_ZNODE_PARENT, "/hbase"); runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_ZNODE_PARENT, "${zk-znode}");
runner.enableControllerService(service); runner.enableControllerService(service);
runner.assertValid(service); runner.assertValid(service);