HBASE-26205 Use specified cluster conf for UserProvider in TableMRUtil#initCredentialsForCluster (#3592)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Reviewed-by: Rushabh Shah <shahrs87@gmail.com>
This commit is contained in:
parent
cf4c764d16
commit
875f3afc2a
|
@ -585,7 +585,7 @@ public class TableMapReduceUtil {
|
||||||
*/
|
*/
|
||||||
public static void initCredentialsForCluster(Job job, Configuration conf)
|
public static void initCredentialsForCluster(Job job, Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
|
UserProvider userProvider = UserProvider.instantiate(conf);
|
||||||
if (userProvider.isHBaseSecurityEnabled()) {
|
if (userProvider.isHBaseSecurityEnabled()) {
|
||||||
try {
|
try {
|
||||||
Connection peerConn = ConnectionFactory.createConnection(conf);
|
Connection peerConn = ConnectionFactory.createConnection(conf);
|
||||||
|
|
|
@ -17,18 +17,40 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.mapreduce;
|
package org.apache.hadoop.hbase.mapreduce;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.Collection;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
|
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
|
||||||
|
import org.apache.hadoop.hbase.security.access.AccessController;
|
||||||
|
import org.apache.hadoop.hbase.security.access.PermissionStorage;
|
||||||
|
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
|
||||||
|
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders;
|
||||||
|
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hbase.security.token.TokenProvider;
|
||||||
|
import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
|
||||||
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
|
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
||||||
import org.apache.hadoop.io.LongWritable;
|
import org.apache.hadoop.io.LongWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
|
import org.apache.hadoop.minikdc.MiniKdc;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
@ -38,11 +60,17 @@ import org.junit.experimental.categories.Category;
|
||||||
*/
|
*/
|
||||||
@Category({MapReduceTests.class, MediumTests.class})
|
@Category({MapReduceTests.class, MediumTests.class})
|
||||||
public class TestTableMapReduceUtil {
|
public class TestTableMapReduceUtil {
|
||||||
|
private static final String HTTP_PRINCIPAL = "HTTP/localhost";
|
||||||
|
|
||||||
@ClassRule
|
@ClassRule
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
public static final HBaseClassTestRule CLASS_RULE =
|
||||||
HBaseClassTestRule.forClass(TestTableMapReduceUtil.class);
|
HBaseClassTestRule.forClass(TestTableMapReduceUtil.class);
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void after() {
|
||||||
|
SaslClientAuthenticationProviders.reset();
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because
|
* initTableSnapshotMapperJob is tested in {@link TestTableSnapshotInputFormat} because
|
||||||
* the method depends on an online cluster.
|
* the method depends on an online cluster.
|
||||||
|
@ -51,10 +79,11 @@ public class TestTableMapReduceUtil {
|
||||||
@Test
|
@Test
|
||||||
public void testInitTableMapperJob1() throws Exception {
|
public void testInitTableMapperJob1() throws Exception {
|
||||||
Configuration configuration = new Configuration();
|
Configuration configuration = new Configuration();
|
||||||
Job job = new Job(configuration, "tableName");
|
Job job = Job.getInstance(configuration, "tableName");
|
||||||
// test
|
// test
|
||||||
TableMapReduceUtil.initTableMapperJob("Table", new Scan(), Import.Importer.class, Text.class,
|
TableMapReduceUtil.initTableMapperJob(
|
||||||
Text.class, job, false, WALInputFormat.class);
|
"Table", new Scan(), Import.Importer.class, Text.class, Text.class, job,
|
||||||
|
false, WALInputFormat.class);
|
||||||
assertEquals(WALInputFormat.class, job.getInputFormatClass());
|
assertEquals(WALInputFormat.class, job.getInputFormatClass());
|
||||||
assertEquals(Import.Importer.class, job.getMapperClass());
|
assertEquals(Import.Importer.class, job.getMapperClass());
|
||||||
assertEquals(LongWritable.class, job.getOutputKeyClass());
|
assertEquals(LongWritable.class, job.getOutputKeyClass());
|
||||||
|
@ -66,9 +95,10 @@ public class TestTableMapReduceUtil {
|
||||||
@Test
|
@Test
|
||||||
public void testInitTableMapperJob2() throws Exception {
|
public void testInitTableMapperJob2() throws Exception {
|
||||||
Configuration configuration = new Configuration();
|
Configuration configuration = new Configuration();
|
||||||
Job job = new Job(configuration, "tableName");
|
Job job = Job.getInstance(configuration, "tableName");
|
||||||
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
|
TableMapReduceUtil.initTableMapperJob(
|
||||||
Import.Importer.class, Text.class, Text.class, job, false, WALInputFormat.class);
|
Bytes.toBytes("Table"), new Scan(), Import.Importer.class, Text.class,
|
||||||
|
Text.class, job, false, WALInputFormat.class);
|
||||||
assertEquals(WALInputFormat.class, job.getInputFormatClass());
|
assertEquals(WALInputFormat.class, job.getInputFormatClass());
|
||||||
assertEquals(Import.Importer.class, job.getMapperClass());
|
assertEquals(Import.Importer.class, job.getMapperClass());
|
||||||
assertEquals(LongWritable.class, job.getOutputKeyClass());
|
assertEquals(LongWritable.class, job.getOutputKeyClass());
|
||||||
|
@ -80,9 +110,10 @@ public class TestTableMapReduceUtil {
|
||||||
@Test
|
@Test
|
||||||
public void testInitTableMapperJob3() throws Exception {
|
public void testInitTableMapperJob3() throws Exception {
|
||||||
Configuration configuration = new Configuration();
|
Configuration configuration = new Configuration();
|
||||||
Job job = new Job(configuration, "tableName");
|
Job job = Job.getInstance(configuration, "tableName");
|
||||||
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
|
TableMapReduceUtil.initTableMapperJob(
|
||||||
Import.Importer.class, Text.class, Text.class, job);
|
Bytes.toBytes("Table"), new Scan(), Import.Importer.class, Text.class,
|
||||||
|
Text.class, job);
|
||||||
assertEquals(TableInputFormat.class, job.getInputFormatClass());
|
assertEquals(TableInputFormat.class, job.getInputFormatClass());
|
||||||
assertEquals(Import.Importer.class, job.getMapperClass());
|
assertEquals(Import.Importer.class, job.getMapperClass());
|
||||||
assertEquals(LongWritable.class, job.getOutputKeyClass());
|
assertEquals(LongWritable.class, job.getOutputKeyClass());
|
||||||
|
@ -94,9 +125,10 @@ public class TestTableMapReduceUtil {
|
||||||
@Test
|
@Test
|
||||||
public void testInitTableMapperJob4() throws Exception {
|
public void testInitTableMapperJob4() throws Exception {
|
||||||
Configuration configuration = new Configuration();
|
Configuration configuration = new Configuration();
|
||||||
Job job = new Job(configuration, "tableName");
|
Job job = Job.getInstance(configuration, "tableName");
|
||||||
TableMapReduceUtil.initTableMapperJob(Bytes.toBytes("Table"), new Scan(),
|
TableMapReduceUtil.initTableMapperJob(
|
||||||
Import.Importer.class, Text.class, Text.class, job, false);
|
Bytes.toBytes("Table"), new Scan(), Import.Importer.class, Text.class,
|
||||||
|
Text.class, job, false);
|
||||||
assertEquals(TableInputFormat.class, job.getInputFormatClass());
|
assertEquals(TableInputFormat.class, job.getInputFormatClass());
|
||||||
assertEquals(Import.Importer.class, job.getMapperClass());
|
assertEquals(Import.Importer.class, job.getMapperClass());
|
||||||
assertEquals(LongWritable.class, job.getOutputKeyClass());
|
assertEquals(LongWritable.class, job.getOutputKeyClass());
|
||||||
|
@ -104,4 +136,155 @@ public class TestTableMapReduceUtil {
|
||||||
assertNull(job.getCombinerClass());
|
assertNull(job.getCombinerClass());
|
||||||
assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
|
assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static Closeable startSecureMiniCluster(
|
||||||
|
HBaseTestingUtility util, MiniKdc kdc, String principal) throws Exception {
|
||||||
|
Configuration conf = util.getConfiguration();
|
||||||
|
|
||||||
|
SecureTestUtil.enableSecurity(conf);
|
||||||
|
VisibilityTestUtil.enableVisiblityLabels(conf);
|
||||||
|
SecureTestUtil.verifyConfiguration(conf);
|
||||||
|
|
||||||
|
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||||
|
AccessController.class.getName() + ',' + TokenProvider.class.getName());
|
||||||
|
|
||||||
|
HBaseKerberosUtils.setSecuredConfiguration(conf,
|
||||||
|
principal + '@' + kdc.getRealm(), HTTP_PRINCIPAL + '@' + kdc.getRealm());
|
||||||
|
|
||||||
|
util.startMiniCluster();
|
||||||
|
try {
|
||||||
|
util.waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
|
||||||
|
} catch (Exception e) {
|
||||||
|
util.shutdownMiniCluster();
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
return util::shutdownMiniCluster;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testInitCredentialsForCluster1() throws Exception {
|
||||||
|
HBaseTestingUtility util1 = new HBaseTestingUtility();
|
||||||
|
HBaseTestingUtility util2 = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
util1.startMiniCluster();
|
||||||
|
try {
|
||||||
|
util2.startMiniCluster();
|
||||||
|
try {
|
||||||
|
Configuration conf1 = util1.getConfiguration();
|
||||||
|
Job job = Job.getInstance(conf1);
|
||||||
|
|
||||||
|
TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
|
||||||
|
|
||||||
|
Credentials credentials = job.getCredentials();
|
||||||
|
Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
|
||||||
|
assertTrue(tokens.isEmpty());
|
||||||
|
} finally {
|
||||||
|
util2.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
util1.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test @SuppressWarnings("unchecked") public void testInitCredentialsForCluster2()
|
||||||
|
throws Exception {
|
||||||
|
HBaseTestingUtility util1 = new HBaseTestingUtility();
|
||||||
|
HBaseTestingUtility util2 = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
|
||||||
|
MiniKdc kdc = util1.setupMiniKdc(keytab);
|
||||||
|
try {
|
||||||
|
String username = UserGroupInformation.getLoginUser().getShortUserName();
|
||||||
|
String userPrincipal = username + "/localhost";
|
||||||
|
kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
|
||||||
|
loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
|
||||||
|
|
||||||
|
try (Closeable util1Closeable = startSecureMiniCluster(util1, kdc, userPrincipal);
|
||||||
|
Closeable util2Closeable = startSecureMiniCluster(util2, kdc, userPrincipal)) {
|
||||||
|
Configuration conf1 = util1.getConfiguration();
|
||||||
|
Job job = Job.getInstance(conf1);
|
||||||
|
|
||||||
|
TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
|
||||||
|
|
||||||
|
Credentials credentials = job.getCredentials();
|
||||||
|
Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
|
||||||
|
assertEquals(1, tokens.size());
|
||||||
|
|
||||||
|
String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
|
||||||
|
Token<AuthenticationTokenIdentifier> tokenForCluster =
|
||||||
|
(Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
|
||||||
|
assertEquals(userPrincipal + '@' + kdc.getRealm(),
|
||||||
|
tokenForCluster.decodeIdentifier().getUsername());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
kdc.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void testInitCredentialsForCluster3() throws Exception {
|
||||||
|
HBaseTestingUtility util1 = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
File keytab = new File(util1.getDataTestDir("keytab").toUri().getPath());
|
||||||
|
MiniKdc kdc = util1.setupMiniKdc(keytab);
|
||||||
|
try {
|
||||||
|
String username = UserGroupInformation.getLoginUser().getShortUserName();
|
||||||
|
String userPrincipal = username + "/localhost";
|
||||||
|
kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
|
||||||
|
loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
|
||||||
|
|
||||||
|
try (Closeable util1Closeable = startSecureMiniCluster(util1, kdc, userPrincipal)) {
|
||||||
|
HBaseTestingUtility util2 = new HBaseTestingUtility();
|
||||||
|
// Assume util2 is insecure cluster
|
||||||
|
// Do not start util2 because cannot boot secured mini cluster and insecure mini cluster at
|
||||||
|
// once
|
||||||
|
|
||||||
|
Configuration conf1 = util1.getConfiguration();
|
||||||
|
Job job = Job.getInstance(conf1);
|
||||||
|
|
||||||
|
TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
|
||||||
|
|
||||||
|
Credentials credentials = job.getCredentials();
|
||||||
|
Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
|
||||||
|
assertTrue(tokens.isEmpty());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
kdc.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test @SuppressWarnings("unchecked") public void testInitCredentialsForCluster4()
|
||||||
|
throws Exception {
|
||||||
|
HBaseTestingUtility util1 = new HBaseTestingUtility();
|
||||||
|
// Assume util1 is insecure cluster
|
||||||
|
// Do not start util1 because cannot boot secured mini cluster and insecure mini cluster at once
|
||||||
|
|
||||||
|
HBaseTestingUtility util2 = new HBaseTestingUtility();
|
||||||
|
File keytab = new File(util2.getDataTestDir("keytab").toUri().getPath());
|
||||||
|
MiniKdc kdc = util2.setupMiniKdc(keytab);
|
||||||
|
try {
|
||||||
|
String username = UserGroupInformation.getLoginUser().getShortUserName();
|
||||||
|
String userPrincipal = username + "/localhost";
|
||||||
|
kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
|
||||||
|
loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
|
||||||
|
|
||||||
|
try (Closeable util2Closeable = startSecureMiniCluster(util2, kdc, userPrincipal)) {
|
||||||
|
Configuration conf1 = util1.getConfiguration();
|
||||||
|
Job job = Job.getInstance(conf1);
|
||||||
|
|
||||||
|
TableMapReduceUtil.initCredentialsForCluster(job, util2.getConfiguration());
|
||||||
|
|
||||||
|
Credentials credentials = job.getCredentials();
|
||||||
|
Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
|
||||||
|
assertEquals(1, tokens.size());
|
||||||
|
|
||||||
|
String clusterId = ZKClusterId.readClusterIdZNode(util2.getZooKeeperWatcher());
|
||||||
|
Token<AuthenticationTokenIdentifier> tokenForCluster =
|
||||||
|
(Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId));
|
||||||
|
assertEquals(userPrincipal + '@' + kdc.getRealm(),
|
||||||
|
tokenForCluster.decodeIdentifier().getUsername());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
kdc.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue