diff --git a/hbase-mapreduce/pom.xml b/hbase-mapreduce/pom.xml index 52fd92f2495..e5019f47d71 100644 --- a/hbase-mapreduce/pom.xml +++ b/hbase-mapreduce/pom.xml @@ -250,6 +250,11 @@ test-jar test + + org.apache.hadoop + hadoop-minikdc + test + org.mockito mockito-core diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index 451da8724b0..f1fdbf175f5 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -270,7 +270,8 @@ public class VerifyReplication extends Configured implements Tool { if (!sourceResult.isEmpty()) { context.getCounter(Counters.GOODROWS).increment(1); if (verbose) { - LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row.getRow()) + LOG.info("Good row key (with recompare): " + delimiter + + Bytes.toStringBinary(row.getRow()) + delimiter); } } @@ -480,12 +481,16 @@ public class VerifyReplication extends Configured implements Tool { TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job); } + Configuration peerClusterConf; if (peerId != null) { assert peerConfigPair != null; - Configuration peerClusterConf = peerConfigPair.getSecond(); - // Obtain the auth token from peer cluster - TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); + peerClusterConf = peerConfigPair.getSecond(); + } else { + peerClusterConf = HBaseConfiguration.createClusterConf(conf, + peerQuorumAddress, PEER_CONFIG_PREFIX); } + // Obtain the auth token from peer cluster + TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf); job.setOutputFormatClass(NullOutputFormat.class); job.setNumReduceTasks(0); @@ -668,7 +673,8 @@ public class VerifyReplication extends Configured implements Tool { // This is to avoid making recompare calls to source/peer tables when snapshots are used if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) { printUsage( - "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable"); + "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are" + + " immutable"); return false; } @@ -708,8 +714,8 @@ public class VerifyReplication extends Configured implements Tool { System.err.println(" without endtime means from starttime to forever"); System.err.println(" endtime end of the time range"); System.err.println(" versions number of cell versions to verify"); - System.err.println(" batch batch count for scan, " + - "note that result row counts will no longer be actual number of rows when you use this option"); + System.err.println(" batch batch count for scan, note that" + + " result row counts will no longer be actual number of rows when you use this option"); System.err.println(" raw includes raw scan if given in options"); System.err.println(" families comma-separated list of families to copy"); System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on "); @@ -726,16 +732,64 @@ public class VerifyReplication extends Configured implements Tool { System.err.println(" peerHBaseRootAddress Peer cluster HBase root location"); System.err.println(); System.err.println("Args:"); - System.err.println(" peerid Id of the peer used for verification, must match the one given for replication"); + System.err.println(" peerid Id of the peer used for verification," + + " must match the one given for replication"); System.err.println(" peerQuorumAddress quorumAdress of the peer used for verification. The " + "format is zk_quorum:zk_port:zk_hbase_path"); System.err.println(" tablename Name of the table to verify"); System.err.println(); System.err.println("Examples:"); - System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 "); + System.err.println( + " To verify the data replicated from TestTable for a 1 hour window with peer #5 "); System.err.println(" $ hbase " + "org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" + " --starttime=1265875194289 --endtime=1265878794289 5 TestTable "); + System.err.println(); + System.err.println( + " To verify the data in TestTable between the cluster runs VerifyReplication and cluster-b"); + System.err.println(" Assume quorum address for cluster-b is" + + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:2181:/cluster-b"); + System.err.println( + " $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" + + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:" + + "2181:/cluster-b \\\n" + + " TestTable"); + System.err.println(); + System.err.println( + " To verify the data in TestTable between the secured cluster runs VerifyReplication" + + " and insecure cluster-b"); + System.err.println( + " $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" + + " -D verifyrep.peer.hbase.security.authentication=simple \\\n" + + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:" + + "2181:/cluster-b \\\n" + + " TestTable"); + System.err.println(); + System.err.println(" To verify the data in TestTable between" + + " the secured cluster runs VerifyReplication and secured cluster-b"); + System.err.println(" Assume cluster-b uses different kerberos principal, cluster-b/_HOST@E" + + ", for master and regionserver kerberos principal from another cluster"); + System.err.println( + " $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" + + " -D verifyrep.peer.hbase.regionserver.kerberos.principal=" + + "cluster-b/_HOST@EXAMPLE.COM \\\n" + + " -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n" + + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:" + + "2181:/cluster-b \\\n" + + " TestTable"); + System.err.println(); + System.err.println( + " To verify the data in TestTable between the insecure cluster runs VerifyReplication" + + " and secured cluster-b"); + System.err.println( + " $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" + + " -D verifyrep.peer.hbase.security.authentication=kerberos \\\n" + + " -D verifyrep.peer.hbase.regionserver.kerberos.principal=" + + "cluster-b/_HOST@EXAMPLE.COM \\\n" + + " -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n" + + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:" + + "2181:/cluster-b \\\n" + + " TestTable"); } @Override diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationSecureClusterCredentials.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationSecureClusterCredentials.java new file mode 100644 index 00000000000..7df9640b525 --- /dev/null +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationSecureClusterCredentials.java @@ -0,0 +1,166 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication; + +import static org.junit.Assert.assertEquals; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.function.Supplier; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication; +import org.apache.hadoop.hbase.security.HBaseKerberosUtils; +import org.apache.hadoop.hbase.security.access.AccessController; +import org.apache.hadoop.hbase.security.access.SecureTestUtil; +import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier; +import org.apache.hadoop.hbase.security.token.TokenProvider; +import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.zookeeper.ZKClusterId; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; + +@Category({ ReplicationTests.class, LargeTests.class }) +@RunWith(Parameterized.class) +public class TestVerifyReplicationSecureClusterCredentials { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestVerifyReplicationSecureClusterCredentials.class); + + private static MiniKdc KDC; + private static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility(); + private static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility(); + + private static final File KEYTAB_FILE = + new File(UTIL1.getDataTestDir("keytab").toUri().getPath()); + + private static final String LOCALHOST = "localhost"; + private static String CLUSTER_PRINCIPAL; + private static String FULL_USER_PRINCIPAL; + private static String HTTP_PRINCIPAL; + + private static void setUpKdcServer() throws Exception { + KDC = UTIL1.setupMiniKdc(KEYTAB_FILE); + String username = UserGroupInformation.getLoginUser().getShortUserName(); + String userPrincipal = username + '/' + LOCALHOST; + CLUSTER_PRINCIPAL = userPrincipal; + FULL_USER_PRINCIPAL = userPrincipal + '@' + KDC.getRealm(); + HTTP_PRINCIPAL = "HTTP/" + LOCALHOST; + KDC.createPrincipal(KEYTAB_FILE, CLUSTER_PRINCIPAL, HTTP_PRINCIPAL); + } + + private static void setupCluster(HBaseTestingUtility util) throws Exception { + Configuration conf = util.getConfiguration(); + + SecureTestUtil.enableSecurity(conf); + VisibilityTestUtil.enableVisiblityLabels(conf); + SecureTestUtil.verifyConfiguration(conf); + + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + AccessController.class.getName() + ',' + TokenProvider.class.getName()); + + HBaseKerberosUtils.setSecuredConfiguration(conf, + CLUSTER_PRINCIPAL + '@' + KDC.getRealm(), HTTP_PRINCIPAL + '@' + KDC.getRealm()); + + util.startMiniCluster(); + } + + /** + * Sets the security firstly for getting the correct default realm. + */ + @BeforeClass + public static void beforeClass() throws Exception { + setUpKdcServer(); + setupCluster(UTIL1); + setupCluster(UTIL2); + + try (Admin admin = UTIL1.getAdmin()) { + admin.addReplicationPeer("1", ReplicationPeerConfig.newBuilder() + .setClusterKey(ZKConfig.getZooKeeperClusterKey(UTIL2.getConfiguration())) + .putConfiguration(HBaseKerberosUtils.KRB_PRINCIPAL, + UTIL2.getConfiguration().get(HBaseKerberosUtils.KRB_PRINCIPAL)) + .putConfiguration(HBaseKerberosUtils.MASTER_KRB_PRINCIPAL, + UTIL2.getConfiguration().get(HBaseKerberosUtils.MASTER_KRB_PRINCIPAL)) + .build()); + } + } + + @AfterClass + public static void cleanup() throws IOException { + UTIL1.shutdownMiniCluster(); + UTIL2.shutdownMiniCluster(); + } + + @Parameters + public static Collection> peer() { + return Arrays.asList( + () -> "1", + () -> ZKConfig.getZooKeeperClusterKey(UTIL2.getConfiguration()) + ); + } + + @Parameter + public Supplier peer; + + @Test + @SuppressWarnings("unchecked") + public void testJobCredentials() throws Exception { + Job job = new VerifyReplication().createSubmittableJob( + new Configuration(UTIL1.getConfiguration()), + new String[] { + peer.get(), + "table" + }); + + Credentials credentials = job.getCredentials(); + Collection> tokens = credentials.getAllTokens(); + assertEquals(2, tokens.size()); + + String clusterId1 = ZKClusterId.readClusterIdZNode(UTIL1.getZooKeeperWatcher()); + Token tokenForCluster1 = + (Token) credentials.getToken(new Text(clusterId1)); + assertEquals(FULL_USER_PRINCIPAL, tokenForCluster1.decodeIdentifier().getUsername()); + + String clusterId2 = ZKClusterId.readClusterIdZNode(UTIL2.getZooKeeperWatcher()); + Token tokenForCluster2 = + (Token) credentials.getToken(new Text(clusterId2)); + assertEquals(FULL_USER_PRINCIPAL, tokenForCluster2.decodeIdentifier().getUsername()); + } +}