mirror of https://github.com/apache/nifi.git
NIFI-1488: Add HBase Kerberos Support with UGI
Added `kerberos-principal` and `kerberos-keytab` properties to the HBase service.
This commit is contained in:
parent
a2164136db
commit
7f15626af5
|
@ -333,6 +333,10 @@ public class StandardValidators {
|
||||||
|
|
||||||
public static final Validator FILE_EXISTS_VALIDATOR = new FileExistsValidator(true);
|
public static final Validator FILE_EXISTS_VALIDATOR = new FileExistsValidator(true);
|
||||||
|
|
||||||
|
private static final String PRINCIPAL_CHAR_CLASS = "[A-Za-z0-9\\\\\\/\\.@]";
|
||||||
|
|
||||||
|
public static final Validator KERB_PRINC_VALIDATOR = createRegexMatchingValidator(Pattern.compile(PRINCIPAL_CHAR_CLASS + "+" +
|
||||||
|
"@" + PRINCIPAL_CHAR_CLASS + "+"));
|
||||||
//
|
//
|
||||||
//
|
//
|
||||||
// FACTORY METHODS FOR VALIDATORS
|
// FACTORY METHODS FOR VALIDATORS
|
||||||
|
|
|
@ -86,4 +86,23 @@ public class TestStandardValidators {
|
||||||
assertFalse(vr.isValid());
|
assertFalse(vr.isValid());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testKerbPrincipalValidator() {
|
||||||
|
Validator val = StandardValidators.KERB_PRINC_VALIDATOR;
|
||||||
|
ValidationResult vr;
|
||||||
|
|
||||||
|
final ValidationContext validationContext = Mockito.mock(ValidationContext.class);
|
||||||
|
vr = val.validate("Kerberos Principal","jon@CDH.PROD", validationContext);
|
||||||
|
assertTrue(vr.isValid());
|
||||||
|
|
||||||
|
vr = val.validate("Kerberos Principal","jon@CDH", validationContext);
|
||||||
|
assertTrue(vr.isValid());
|
||||||
|
|
||||||
|
vr = val.validate("kerberos-principal","service/nifi@PROD", validationContext);
|
||||||
|
assertTrue(vr.isValid());
|
||||||
|
|
||||||
|
vr = val.validate("keberos-principal", "joewitt", validationContext);
|
||||||
|
assertFalse(vr.isValid());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,10 +36,24 @@ public interface HBaseClientService extends ControllerService {
|
||||||
|
|
||||||
PropertyDescriptor HADOOP_CONF_FILES = new PropertyDescriptor.Builder()
|
PropertyDescriptor HADOOP_CONF_FILES = new PropertyDescriptor.Builder()
|
||||||
.name("Hadoop Configuration Files")
|
.name("Hadoop Configuration Files")
|
||||||
.description("Comma-separated list of Hadoop Configuration files, such as hbase-site.xml, including full paths to the files.")
|
.description("Comma-separated list of Hadoop Configuration files," +
|
||||||
|
" such as hbase-site.xml and core-site.xml for kerberos, " +
|
||||||
|
"including full paths to the files.")
|
||||||
.addValidator(new ConfigFilesValidator())
|
.addValidator(new ConfigFilesValidator())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
|
||||||
|
.name("kerberos-principal").displayName("Kerberos Principal")
|
||||||
|
.description("Principal of user writing to hbase").required(false)
|
||||||
|
.addValidator(StandardValidators.KERB_PRINC_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
PropertyDescriptor KERBEROS_KEYTAB = new PropertyDescriptor.Builder()
|
||||||
|
.name("kerberos-keytab").displayName("Kerberos Keytab")
|
||||||
|
.description("Path to keytab file").required(false)
|
||||||
|
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
PropertyDescriptor ZOOKEEPER_QUORUM = new PropertyDescriptor.Builder()
|
PropertyDescriptor ZOOKEEPER_QUORUM = new PropertyDescriptor.Builder()
|
||||||
.name("ZooKeeper Quorum")
|
.name("ZooKeeper Quorum")
|
||||||
.description("Comma-separated list of ZooKeeper hosts for HBase. Required if Hadoop Configuration Files are not provided.")
|
.description("Comma-separated list of ZooKeeper hosts for HBase. Required if Hadoop Configuration Files are not provided.")
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.nifi.hbase;
|
package org.apache.nifi.hbase;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.ObjectUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.filter.Filter;
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
import org.apache.hadoop.hbase.filter.ParseFilter;
|
import org.apache.hadoop.hbase.filter.ParseFilter;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
import org.apache.nifi.annotation.documentation.Tags;
|
import org.apache.nifi.annotation.documentation.Tags;
|
||||||
|
@ -60,6 +62,11 @@ import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import javax.security.auth.login.LoginException;
|
||||||
|
|
||||||
@Tags({ "hbase", "client"})
|
@Tags({ "hbase", "client"})
|
||||||
@CapabilityDescription("Implementation of HBaseClientService for HBase 1.1.2. This service can be configured by providing " +
|
@CapabilityDescription("Implementation of HBaseClientService for HBase 1.1.2. This service can be configured by providing " +
|
||||||
"a comma-separated list of configuration files, or by specifying values for the other properties. If configuration files " +
|
"a comma-separated list of configuration files, or by specifying values for the other properties. If configuration files " +
|
||||||
|
@ -69,7 +76,7 @@ import java.util.Map;
|
||||||
@DynamicProperty(name="The name of an HBase configuration property.", value="The value of the given HBase configuration property.",
|
@DynamicProperty(name="The name of an HBase configuration property.", value="The value of the given HBase configuration property.",
|
||||||
description="These properties will be set on the HBase configuration after loading any provided configuration files.")
|
description="These properties will be set on the HBase configuration after loading any provided configuration files.")
|
||||||
public class HBase_1_1_2_ClientService extends AbstractControllerService implements HBaseClientService {
|
public class HBase_1_1_2_ClientService extends AbstractControllerService implements HBaseClientService {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(HBase_1_1_2_ClientService.class);
|
||||||
static final String HBASE_CONF_ZK_QUORUM = "hbase.zookeeper.quorum";
|
static final String HBASE_CONF_ZK_QUORUM = "hbase.zookeeper.quorum";
|
||||||
static final String HBASE_CONF_ZK_PORT = "hbase.zookeeper.property.clientPort";
|
static final String HBASE_CONF_ZK_PORT = "hbase.zookeeper.property.clientPort";
|
||||||
static final String HBASE_CONF_ZNODE_PARENT = "zookeeper.znode.parent";
|
static final String HBASE_CONF_ZNODE_PARENT = "zookeeper.znode.parent";
|
||||||
|
@ -78,10 +85,14 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
||||||
private volatile Connection connection;
|
private volatile Connection connection;
|
||||||
private List<PropertyDescriptor> properties;
|
private List<PropertyDescriptor> properties;
|
||||||
|
|
||||||
|
protected boolean isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void init(ControllerServiceInitializationContext config) throws InitializationException {
|
protected void init(ControllerServiceInitializationContext config) throws InitializationException {
|
||||||
List<PropertyDescriptor> props = new ArrayList<>();
|
List<PropertyDescriptor> props = new ArrayList<>();
|
||||||
props.add(HADOOP_CONF_FILES);
|
props.add(HADOOP_CONF_FILES);
|
||||||
|
props.add(KERBEROS_PRINCIPAL);
|
||||||
|
props.add(KERBEROS_KEYTAB);
|
||||||
props.add(ZOOKEEPER_QUORUM);
|
props.add(ZOOKEEPER_QUORUM);
|
||||||
props.add(ZOOKEEPER_CLIENT_PORT);
|
props.add(ZOOKEEPER_CLIENT_PORT);
|
||||||
props.add(ZOOKEEPER_ZNODE_PARENT);
|
props.add(ZOOKEEPER_ZNODE_PARENT);
|
||||||
|
@ -111,6 +122,8 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
||||||
boolean zkPortProvided = validationContext.getProperty(ZOOKEEPER_CLIENT_PORT).isSet();
|
boolean zkPortProvided = validationContext.getProperty(ZOOKEEPER_CLIENT_PORT).isSet();
|
||||||
boolean znodeParentProvided = validationContext.getProperty(ZOOKEEPER_ZNODE_PARENT).isSet();
|
boolean znodeParentProvided = validationContext.getProperty(ZOOKEEPER_ZNODE_PARENT).isSet();
|
||||||
boolean retriesProvided = validationContext.getProperty(HBASE_CLIENT_RETRIES).isSet();
|
boolean retriesProvided = validationContext.getProperty(HBASE_CLIENT_RETRIES).isSet();
|
||||||
|
boolean kerbprincProvided = validationContext.getProperty(KERBEROS_PRINCIPAL).isSet();
|
||||||
|
boolean kerbkeytabProvided = validationContext.getProperty(KERBEROS_KEYTAB).isSet();
|
||||||
|
|
||||||
final List<ValidationResult> problems = new ArrayList<>();
|
final List<ValidationResult> problems = new ArrayList<>();
|
||||||
|
|
||||||
|
@ -123,6 +136,14 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
||||||
.build());
|
.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (isSecurityEnabled && (!kerbprincProvided || !kerbkeytabProvided)) {
|
||||||
|
problems.add(new ValidationResult.Builder().valid(false)
|
||||||
|
.subject(this.getClass().getSimpleName()).explanation("Kerberos" +
|
||||||
|
" principal and keytab must be provided when using a secure " +
|
||||||
|
"hbase")
|
||||||
|
.build());
|
||||||
|
}
|
||||||
|
|
||||||
return problems;
|
return problems;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -170,7 +191,18 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
||||||
hbaseConfig.set(descriptor.getName(), entry.getValue());
|
hbaseConfig.set(descriptor.getName(), entry.getValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
UserGroupInformation.setConfiguration(hbaseConfig);
|
||||||
|
isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
|
||||||
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
|
try{
|
||||||
|
UserGroupInformation.loginUserFromKeytab(context.getProperty(KERBEROS_PRINCIPAL).getValue(),
|
||||||
|
context.getProperty(KERBEROS_KEYTAB).getValue());
|
||||||
|
LOG.info("HBase Security Enabled, Logging in as User {}");
|
||||||
|
} catch (Exception e) {
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.info("Simple Authentication");
|
||||||
|
}
|
||||||
return ConnectionFactory.createConnection(hbaseConfig);
|
return ConnectionFactory.createConnection(hbaseConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -187,6 +219,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void put(final String tableName, final Collection<PutFlowFile> puts) throws IOException {
|
public void put(final String tableName, final Collection<PutFlowFile> puts) throws IOException {
|
||||||
|
UserGroupInformation.getBestUGI(null,null).checkTGTAndReloginFromKeytab();
|
||||||
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
|
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
|
||||||
// Create one Put per row....
|
// Create one Put per row....
|
||||||
final Map<String, Put> rowPuts = new HashMap<>();
|
final Map<String, Put> rowPuts = new HashMap<>();
|
||||||
|
@ -211,6 +244,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void put(final String tableName, final String rowId, final Collection<PutColumn> columns) throws IOException {
|
public void put(final String tableName, final String rowId, final Collection<PutColumn> columns) throws IOException {
|
||||||
|
UserGroupInformation.getBestUGI(null,null).checkTGTAndReloginFromKeytab();
|
||||||
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
|
try (final Table table = connection.getTable(TableName.valueOf(tableName))) {
|
||||||
Put put = new Put(rowId.getBytes(StandardCharsets.UTF_8));
|
Put put = new Put(rowId.getBytes(StandardCharsets.UTF_8));
|
||||||
for (final PutColumn column : columns) {
|
for (final PutColumn column : columns) {
|
||||||
|
@ -232,7 +266,7 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
||||||
ParseFilter parseFilter = new ParseFilter();
|
ParseFilter parseFilter = new ParseFilter();
|
||||||
filter = parseFilter.parseFilterString(filterExpression);
|
filter = parseFilter.parseFilterString(filterExpression);
|
||||||
}
|
}
|
||||||
|
UserGroupInformation.getBestUGI(null,null).checkTGTAndReloginFromKeytab();
|
||||||
try (final Table table = connection.getTable(TableName.valueOf(tableName));
|
try (final Table table = connection.getTable(TableName.valueOf(tableName));
|
||||||
final ResultScanner scanner = getResults(table, columns, filter, minTime)) {
|
final ResultScanner scanner = getResults(table, columns, filter, minTime)) {
|
||||||
|
|
||||||
|
@ -309,5 +343,4 @@ public class HBase_1_1_2_ClientService extends AbstractControllerService impleme
|
||||||
|
|
||||||
return table.getScanner(scan);
|
return table.getScanner(scan);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,6 +121,35 @@ public class TestHBase_1_1_2_ClientService {
|
||||||
runner.enableControllerService(service);
|
runner.enableControllerService(service);
|
||||||
|
|
||||||
runner.assertValid(service);
|
runner.assertValid(service);
|
||||||
|
|
||||||
|
// Kerberos - principal with non-set keytab
|
||||||
|
runner.disableControllerService(service);
|
||||||
|
service.setIsSecurityEnabled(true);
|
||||||
|
runner.setProperty(service, HBase_1_1_2_ClientService.HADOOP_CONF_FILES, "src/test/resources/core-site-security.xml");
|
||||||
|
runner.setProperty(service, HBase_1_1_2_ClientService.KERBEROS_PRINCIPAL, "test@REALM");
|
||||||
|
runner.enableControllerService(service);
|
||||||
|
runner.assertNotValid(service);
|
||||||
|
|
||||||
|
// Kerberos - add valid options
|
||||||
|
runner.disableControllerService(service);
|
||||||
|
runner.setProperty(service, HBase_1_1_2_ClientService.KERBEROS_KEYTAB, "src/test/resources/fake.keytab");
|
||||||
|
runner.setProperty(service, HBase_1_1_2_ClientService.KERBEROS_PRINCIPAL, "test@REALM");
|
||||||
|
runner.enableControllerService(service);
|
||||||
|
runner.assertValid(service);
|
||||||
|
|
||||||
|
// Kerberos - add invalid non-existent keytab file
|
||||||
|
runner.disableControllerService(service);
|
||||||
|
runner.setProperty(service, HBase_1_1_2_ClientService.KERBEROS_KEYTAB, "src/test/resources/missing.keytab");
|
||||||
|
runner.enableControllerService(service);
|
||||||
|
runner.assertNotValid(service);
|
||||||
|
|
||||||
|
// Kerberos - add invalid principal
|
||||||
|
runner.disableControllerService(service);
|
||||||
|
runner.setProperty(service, HBase_1_1_2_ClientService.KERBEROS_KEYTAB, "src/test/resources/fake.keytab");
|
||||||
|
runner.setProperty(service, HBase_1_1_2_ClientService.KERBEROS_PRINCIPAL, "invalid");
|
||||||
|
runner.enableControllerService(service);
|
||||||
|
runner.assertNotValid(service);
|
||||||
|
|
||||||
runner.removeControllerService(service);
|
runner.removeControllerService(service);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -385,6 +414,10 @@ public class TestHBase_1_1_2_ClientService {
|
||||||
this.table = table;
|
this.table = table;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setIsSecurityEnabled(boolean value) {
|
||||||
|
this.isSecurityEnabled = value;
|
||||||
|
}
|
||||||
|
|
||||||
public void addResult(final String rowKey, final Map<String, String> cells, final long timestamp) {
|
public void addResult(final String rowKey, final Map<String, String> cells, final long timestamp) {
|
||||||
final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8);
|
final byte[] rowArray = rowKey.getBytes(StandardCharsets.UTF_8);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,30 @@
|
||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
|
||||||
|
<!--
|
||||||
|
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
contributor license agreements. See the NOTICE file distributed with
|
||||||
|
this work for additional information regarding copyright ownership.
|
||||||
|
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
(the "License"); you may not use this file except in compliance with
|
||||||
|
the License. You may obtain a copy of the License at
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
-->
|
||||||
|
<configuration>
|
||||||
|
<property>
|
||||||
|
<name>fs.default.name</name>
|
||||||
|
<value>hdfs://hbase</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hbase.security.authentication</name>
|
||||||
|
<value>kerberos</value>
|
||||||
|
</property>
|
||||||
|
<property>
|
||||||
|
<name>hbase.security.authorization</name>
|
||||||
|
<value>true</value>
|
||||||
|
</property>
|
||||||
|
</configuration>
|
Loading…
Reference in New Issue