mirror of https://github.com/apache/nifi.git
NIFI-1175 Exposing minimum properties required to create an HBase connection on the HBaseClientService as an optional alternative to the conf files
This commit is contained in:
parent
453b140d6b
commit
2b9b5e008f
|
@ -24,6 +24,7 @@ import org.apache.nifi.hbase.put.PutFlowFile;
|
||||||
import org.apache.nifi.hbase.scan.Column;
|
import org.apache.nifi.hbase.scan.Column;
|
||||||
import org.apache.nifi.hbase.scan.ResultHandler;
|
import org.apache.nifi.hbase.scan.ResultHandler;
|
||||||
import org.apache.nifi.hbase.validate.ConfigFilesValidator;
|
import org.apache.nifi.hbase.validate.ConfigFilesValidator;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -34,12 +35,35 @@ public interface HBaseClientService extends ControllerService {
|
||||||
|
|
||||||
PropertyDescriptor HADOOP_CONF_FILES = new PropertyDescriptor.Builder()
|
PropertyDescriptor HADOOP_CONF_FILES = new PropertyDescriptor.Builder()
|
||||||
.name("Hadoop Configuration Files")
|
.name("Hadoop Configuration Files")
|
||||||
.description("Comma-separated list of Hadoop Configuration files, such as hbase-site.xml")
|
.description("Comma-separated list of Hadoop Configuration files, such as hbase-site.xml, including full paths to the files.")
|
||||||
.required(true)
|
|
||||||
.defaultValue("./conf/hbase-site.xml")
|
|
||||||
.addValidator(new ConfigFilesValidator())
|
.addValidator(new ConfigFilesValidator())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
PropertyDescriptor ZOOKEEPER_QUORUM = new PropertyDescriptor.Builder()
|
||||||
|
.name("ZooKeeper Quorum")
|
||||||
|
.description("Comma-separated list of ZooKeeper hosts for HBase. Required if Hadoop Configuration Files are not provided.")
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
PropertyDescriptor ZOOKEEPER_CLIENT_PORT = new PropertyDescriptor.Builder()
|
||||||
|
.name("ZooKeeper Client Port")
|
||||||
|
.description("The port on which ZooKeeper is accepting client connections. Required if Hadoop Configuration Files are not provided.")
|
||||||
|
.addValidator(StandardValidators.PORT_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
PropertyDescriptor ZOOKEEPER_ZNODE_PARENT = new PropertyDescriptor.Builder()
|
||||||
|
.name("ZooKeeper ZNode Parent")
|
||||||
|
.description("The ZooKeeper ZNode Parent value for HBase (example: /hbase). Required if Hadoop Configuration Files are not provided.")
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
PropertyDescriptor HBASE_CLIENT_RETRIES = new PropertyDescriptor.Builder()
|
||||||
|
.name("HBase Client Retries")
|
||||||
|
.description("The number of times the HBase client will retry connecting. Required if Hadoop Configuration Files are not provided.")
|
||||||
|
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
|
||||||
|
.defaultValue("1")
|
||||||
|
.build();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Puts a batch of mutations to the given table.
|
* Puts a batch of mutations to the given table.
|
||||||
*
|
*
|
||||||
|
|
|
@ -57,6 +57,12 @@
|
||||||
<artifactId>commons-lang3</artifactId>
|
<artifactId>commons-lang3</artifactId>
|
||||||
<version>3.4</version>
|
<version>3.4</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.slf4j</groupId>
|
||||||
|
<artifactId>log4j-over-slf4j</artifactId>
|
||||||
|
<version>${org.slf4j.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
<!-- test dependencies -->
|
<!-- test dependencies -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|
|
@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
@ -31,11 +32,14 @@ import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.filter.Filter;
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
import org.apache.hadoop.hbase.filter.ParseFilter;
|
import org.apache.hadoop.hbase.filter.ParseFilter;
|
||||||
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
import org.apache.nifi.annotation.lifecycle.OnDisabled;
|
||||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
|
import org.apache.nifi.components.ValidationContext;
|
||||||
|
import org.apache.nifi.components.ValidationResult;
|
||||||
import org.apache.nifi.controller.AbstractControllerService;
|
import org.apache.nifi.controller.AbstractControllerService;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||||
|
@ -43,6 +47,7 @@ import org.apache.nifi.hbase.put.PutFlowFile;
|
||||||
import org.apache.nifi.hbase.scan.Column;
|
import org.apache.nifi.hbase.scan.Column;
|
||||||
import org.apache.nifi.hbase.scan.ResultCell;
|
import org.apache.nifi.hbase.scan.ResultCell;
|
||||||
import org.apache.nifi.hbase.scan.ResultHandler;
|
import org.apache.nifi.hbase.scan.ResultHandler;
|
||||||
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
import org.apache.nifi.reporting.InitializationException;
|
import org.apache.nifi.reporting.InitializationException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -55,9 +60,20 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@Tags({ "hbase", "client"})
|
@Tags({ "hbase", "client"})
|
||||||
@CapabilityDescription("Implementation of HBaseClientService for HBase 1.1.2.")
|
@CapabilityDescription("Implementation of HBaseClientService for HBase 1.1.2. This service can be configured by providing " +
|
||||||
|
"a comma-separated list of configuration files, or by specifying values for the other properties. If configuration files " +
|
||||||
|
"are provided, they will be loaded first, and the values of the additional properties will override the values from " +
|
||||||
|
"the configuration files. In addition, any user defined properties on the processor will also be passed to the HBase " +
|
||||||
|
"configuration.")
|
||||||
|
@DynamicProperty(name="The name of an HBase configuration property.", value="The value of the given HBase configuration property.",
|
||||||
|
description="These properties will be set on the HBase configuration after loading any provided configuration files.")
|
||||||
public class HBase_1_1_2_ClientService extends AbstractControllerService implements HBaseClientService {
|
public class HBase_1_1_2_ClientService extends AbstractControllerService implements HBaseClientService {
|
||||||
|
|
||||||
|
static final String HBASE_CONF_ZK_QUORUM = "hbase.zookeeper.quorum";
|
||||||
|
static final String HBASE_CONF_ZK_PORT = "hbase.zookeeper.property.clientPort";
|
||||||
|
static final String HBASE_CONF_ZNODE_PARENT = "zookeeper.znode.parent";
|
||||||
|
static final String HBASE_CONF_CLIENT_RETRIES = "hbase.client.retries.number";
|
||||||
|
|
||||||
private volatile Connection connection;
|
private volatile Connection connection;
|
||||||
private List<PropertyDescriptor> properties;
|
private List<PropertyDescriptor> properties;
|
||||||
|
|
||||||
|
@ -65,6 +81,10 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
||||||
protected void init(ControllerServiceInitializationContext config) throws InitializationException {
|
protected void init(ControllerServiceInitializationContext config) throws InitializationException {
|
||||||
List<PropertyDescriptor> props = new ArrayList<>();
|
List<PropertyDescriptor> props = new ArrayList<>();
|
||||||
props.add(HADOOP_CONF_FILES);
|
props.add(HADOOP_CONF_FILES);
|
||||||
|
props.add(ZOOKEEPER_QUORUM);
|
||||||
|
props.add(ZOOKEEPER_CLIENT_PORT);
|
||||||
|
props.add(ZOOKEEPER_ZNODE_PARENT);
|
||||||
|
props.add(HBASE_CLIENT_RETRIES);
|
||||||
this.properties = Collections.unmodifiableList(props);
|
this.properties = Collections.unmodifiableList(props);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,16 +93,83 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
||||||
return properties;
|
return properties;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptorName) {
|
||||||
|
return new PropertyDescriptor.Builder()
|
||||||
|
.description("Specifies the value for '" + propertyDescriptorName + "' in the HBase configuration.")
|
||||||
|
.name(propertyDescriptorName)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.dynamic(true)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||||
|
boolean confFileProvided = validationContext.getProperty(HADOOP_CONF_FILES).isSet();
|
||||||
|
boolean zkQuorumProvided = validationContext.getProperty(ZOOKEEPER_QUORUM).isSet();
|
||||||
|
boolean zkPortProvided = validationContext.getProperty(ZOOKEEPER_CLIENT_PORT).isSet();
|
||||||
|
boolean znodeParentProvided = validationContext.getProperty(ZOOKEEPER_ZNODE_PARENT).isSet();
|
||||||
|
boolean retriesProvided = validationContext.getProperty(HBASE_CLIENT_RETRIES).isSet();
|
||||||
|
|
||||||
|
final List<ValidationResult> problems = new ArrayList<>();
|
||||||
|
|
||||||
|
if (!confFileProvided && (!zkQuorumProvided || !zkPortProvided || !znodeParentProvided || !retriesProvided)) {
|
||||||
|
problems.add(new ValidationResult.Builder()
|
||||||
|
.valid(false)
|
||||||
|
.subject(this.getClass().getSimpleName())
|
||||||
|
.explanation("ZooKeeper Quorum, ZooKeeper Client Port, ZooKeeper ZNode Parent, and HBase Client Retries are required " +
|
||||||
|
"when Hadoop Configuration Files are not provided.")
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
|
return problems;
|
||||||
|
}
|
||||||
|
|
||||||
@OnEnabled
|
@OnEnabled
|
||||||
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException {
|
public void onEnabled(final ConfigurationContext context) throws InitializationException, IOException {
|
||||||
this.connection = createConnection(context);
|
this.connection = createConnection(context);
|
||||||
|
|
||||||
|
// connection check
|
||||||
|
if (this.connection != null) {
|
||||||
|
final Admin admin = this.connection.getAdmin();
|
||||||
|
if (admin != null) {
|
||||||
|
admin.listTableNames();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected Connection createConnection(final ConfigurationContext context) throws IOException {
|
protected Connection createConnection(final ConfigurationContext context) throws IOException {
|
||||||
final Configuration hbaseConfig = HBaseConfiguration.create();
|
final Configuration hbaseConfig = HBaseConfiguration.create();
|
||||||
for (final String configFile : context.getProperty(HADOOP_CONF_FILES).getValue().split(",")) {
|
|
||||||
hbaseConfig.addResource(new Path(configFile.trim()));
|
// if conf files are provided, start with those
|
||||||
|
if (context.getProperty(HADOOP_CONF_FILES).isSet()) {
|
||||||
|
for (final String configFile : context.getProperty(HADOOP_CONF_FILES).getValue().split(",")) {
|
||||||
|
hbaseConfig.addResource(new Path(configFile.trim()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// override with any properties that are provided
|
||||||
|
if (context.getProperty(ZOOKEEPER_QUORUM).isSet()) {
|
||||||
|
hbaseConfig.set(HBASE_CONF_ZK_QUORUM, context.getProperty(ZOOKEEPER_QUORUM).getValue());
|
||||||
|
}
|
||||||
|
if (context.getProperty(ZOOKEEPER_CLIENT_PORT).isSet()) {
|
||||||
|
hbaseConfig.set(HBASE_CONF_ZK_PORT, context.getProperty(ZOOKEEPER_CLIENT_PORT).getValue());
|
||||||
|
}
|
||||||
|
if (context.getProperty(ZOOKEEPER_ZNODE_PARENT).isSet()) {
|
||||||
|
hbaseConfig.set(HBASE_CONF_ZNODE_PARENT, context.getProperty(ZOOKEEPER_ZNODE_PARENT).getValue());
|
||||||
|
}
|
||||||
|
if (context.getProperty(HBASE_CLIENT_RETRIES).isSet()) {
|
||||||
|
hbaseConfig.set(HBASE_CONF_CLIENT_RETRIES, context.getProperty(HBASE_CLIENT_RETRIES).getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
// add any dynamic properties to the HBase configuration
|
||||||
|
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
|
||||||
|
final PropertyDescriptor descriptor = entry.getKey();
|
||||||
|
if (descriptor.isDynamic()) {
|
||||||
|
hbaseConfig.set(descriptor.getName(), entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return ConnectionFactory.createConnection(hbaseConfig);
|
return ConnectionFactory.createConnection(hbaseConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -55,6 +55,73 @@ import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class TestHBase_1_1_2_ClientService {
|
public class TestHBase_1_1_2_ClientService {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCustomValidate() throws InitializationException {
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||||
|
|
||||||
|
final String tableName = "nifi";
|
||||||
|
final Table table = Mockito.mock(Table.class);
|
||||||
|
when(table.getName()).thenReturn(TableName.valueOf(tableName));
|
||||||
|
|
||||||
|
// no conf file or zk properties so should be invalid
|
||||||
|
MockHBaseClientService service = new MockHBaseClientService(table);
|
||||||
|
runner.addControllerService("hbaseClientService", service);
|
||||||
|
runner.enableControllerService(service);
|
||||||
|
|
||||||
|
runner.assertNotValid(service);
|
||||||
|
runner.removeControllerService(service);
|
||||||
|
|
||||||
|
// conf file with no zk properties should be valid
|
||||||
|
service = new MockHBaseClientService(table);
|
||||||
|
runner.addControllerService("hbaseClientService", service);
|
||||||
|
runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/core-site.xml");
|
||||||
|
runner.enableControllerService(service);
|
||||||
|
|
||||||
|
runner.assertValid(service);
|
||||||
|
runner.removeControllerService(service);
|
||||||
|
|
||||||
|
// only quorum and no conf file should be invalid
|
||||||
|
service = new MockHBaseClientService(table);
|
||||||
|
runner.addControllerService("hbaseClientService", service);
|
||||||
|
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
|
||||||
|
runner.enableControllerService(service);
|
||||||
|
|
||||||
|
runner.assertNotValid(service);
|
||||||
|
runner.removeControllerService(service);
|
||||||
|
|
||||||
|
// quorum and port, no znode, no conf file, should be invalid
|
||||||
|
service = new MockHBaseClientService(table);
|
||||||
|
runner.addControllerService("hbaseClientService", service);
|
||||||
|
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
|
||||||
|
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181");
|
||||||
|
runner.enableControllerService(service);
|
||||||
|
|
||||||
|
runner.assertNotValid(service);
|
||||||
|
runner.removeControllerService(service);
|
||||||
|
|
||||||
|
// quorum, port, and znode, no conf file, should be valid
|
||||||
|
service = new MockHBaseClientService(table);
|
||||||
|
runner.addControllerService("hbaseClientService", service);
|
||||||
|
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
|
||||||
|
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181");
|
||||||
|
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_ZNODE_PARENT, "/hbase");
|
||||||
|
runner.enableControllerService(service);
|
||||||
|
|
||||||
|
runner.assertValid(service);
|
||||||
|
runner.removeControllerService(service);
|
||||||
|
|
||||||
|
// quorum and port with conf file should be valid
|
||||||
|
service = new MockHBaseClientService(table);
|
||||||
|
runner.addControllerService("hbaseClientService", service);
|
||||||
|
runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/core-site.xml");
|
||||||
|
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_QUORUM, "localhost");
|
||||||
|
runner.setProperty(service, HBase_1_1_2_ClientService.ZOOKEEPER_CLIENT_PORT, "2181");
|
||||||
|
runner.enableControllerService(service);
|
||||||
|
|
||||||
|
runner.assertValid(service);
|
||||||
|
runner.removeControllerService(service);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSinglePut() throws InitializationException, IOException {
|
public void testSinglePut() throws InitializationException, IOException {
|
||||||
final String tableName = "nifi";
|
final String tableName = "nifi";
|
||||||
|
|
Loading…
Reference in New Issue