diff --git a/hbase-mapreduce/pom.xml b/hbase-mapreduce/pom.xml
index 3a0ae04486a..d219360f045 100644
--- a/hbase-mapreduce/pom.xml
+++ b/hbase-mapreduce/pom.xml
@@ -250,6 +250,11 @@
test-jar
test
+
+ org.apache.hadoop
+ hadoop-minikdc
+ test
+
org.mockito
mockito-core
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 451da8724b0..f1fdbf175f5 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -270,7 +270,8 @@ public class VerifyReplication extends Configured implements Tool {
if (!sourceResult.isEmpty()) {
context.getCounter(Counters.GOODROWS).increment(1);
if (verbose) {
- LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row.getRow())
+ LOG.info("Good row key (with recompare): " + delimiter +
+ Bytes.toStringBinary(row.getRow())
+ delimiter);
}
}
@@ -480,12 +481,16 @@ public class VerifyReplication extends Configured implements Tool {
TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
}
+ Configuration peerClusterConf;
if (peerId != null) {
assert peerConfigPair != null;
- Configuration peerClusterConf = peerConfigPair.getSecond();
- // Obtain the auth token from peer cluster
- TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
+ peerClusterConf = peerConfigPair.getSecond();
+ } else {
+ peerClusterConf = HBaseConfiguration.createClusterConf(conf,
+ peerQuorumAddress, PEER_CONFIG_PREFIX);
}
+ // Obtain the auth token from peer cluster
+ TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
@@ -668,7 +673,8 @@ public class VerifyReplication extends Configured implements Tool {
// This is to avoid making recompare calls to source/peer tables when snapshots are used
if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) {
printUsage(
- "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable");
+ "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are"
+ + " immutable");
return false;
}
@@ -708,8 +714,8 @@ public class VerifyReplication extends Configured implements Tool {
System.err.println(" without endtime means from starttime to forever");
System.err.println(" endtime end of the time range");
System.err.println(" versions number of cell versions to verify");
- System.err.println(" batch batch count for scan, " +
- "note that result row counts will no longer be actual number of rows when you use this option");
+ System.err.println(" batch batch count for scan, note that"
+ + " result row counts will no longer be actual number of rows when you use this option");
System.err.println(" raw includes raw scan if given in options");
System.err.println(" families comma-separated list of families to copy");
System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
@@ -726,16 +732,64 @@ public class VerifyReplication extends Configured implements Tool {
System.err.println(" peerHBaseRootAddress Peer cluster HBase root location");
System.err.println();
System.err.println("Args:");
- System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");
+ System.err.println(" peerid Id of the peer used for verification,"
+ + " must match the one given for replication");
System.err.println(" peerQuorumAddress quorumAdress of the peer used for verification. The "
+ "format is zk_quorum:zk_port:zk_hbase_path");
System.err.println(" tablename Name of the table to verify");
System.err.println();
System.err.println("Examples:");
- System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
+ System.err.println(
+ " To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
System.err.println(" $ hbase " +
"org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
" --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
+ System.err.println();
+ System.err.println(
+ " To verify the data in TestTable between the cluster runs VerifyReplication and cluster-b");
+ System.err.println(" Assume quorum address for cluster-b is"
+ + " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:2181:/cluster-b");
+ System.err.println(
+ " $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" +
+ " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
+ + "2181:/cluster-b \\\n" +
+ " TestTable");
+ System.err.println();
+ System.err.println(
+ " To verify the data in TestTable between the secured cluster runs VerifyReplication"
+ + " and insecure cluster-b");
+ System.err.println(
+ " $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" +
+ " -D verifyrep.peer.hbase.security.authentication=simple \\\n" +
+ " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
+ + "2181:/cluster-b \\\n" +
+ " TestTable");
+ System.err.println();
+ System.err.println(" To verify the data in TestTable between" +
+ " the secured cluster runs VerifyReplication and secured cluster-b");
+ System.err.println(" Assume cluster-b uses different kerberos principal, cluster-b/_HOST@E" +
+ ", for master and regionserver kerberos principal from another cluster");
+ System.err.println(
+ " $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" +
+ " -D verifyrep.peer.hbase.regionserver.kerberos.principal="
+ + "cluster-b/_HOST@EXAMPLE.COM \\\n" +
+ " -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n" +
+ " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
+ + "2181:/cluster-b \\\n" +
+ " TestTable");
+ System.err.println();
+ System.err.println(
+ " To verify the data in TestTable between the insecure cluster runs VerifyReplication"
+ + " and secured cluster-b");
+ System.err.println(
+ " $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" +
+ " -D verifyrep.peer.hbase.security.authentication=kerberos \\\n" +
+ " -D verifyrep.peer.hbase.regionserver.kerberos.principal="
+ + "cluster-b/_HOST@EXAMPLE.COM \\\n" +
+ " -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n" +
+ " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
+ + "2181:/cluster-b \\\n" +
+ " TestTable");
}
@Override
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationSecureClusterCredentials.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationSecureClusterCredentials.java
new file mode 100644
index 00000000000..7df9640b525
--- /dev/null
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplicationSecureClusterCredentials.java
@@ -0,0 +1,166 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.function.Supplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
+import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hbase.security.access.AccessController;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
+import org.apache.hadoop.hbase.security.token.TokenProvider;
+import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+@RunWith(Parameterized.class)
+public class TestVerifyReplicationSecureClusterCredentials {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestVerifyReplicationSecureClusterCredentials.class);
+
+ private static MiniKdc KDC;
+ private static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
+ private static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
+
+ private static final File KEYTAB_FILE =
+ new File(UTIL1.getDataTestDir("keytab").toUri().getPath());
+
+ private static final String LOCALHOST = "localhost";
+ private static String CLUSTER_PRINCIPAL;
+ private static String FULL_USER_PRINCIPAL;
+ private static String HTTP_PRINCIPAL;
+
+ private static void setUpKdcServer() throws Exception {
+ KDC = UTIL1.setupMiniKdc(KEYTAB_FILE);
+ String username = UserGroupInformation.getLoginUser().getShortUserName();
+ String userPrincipal = username + '/' + LOCALHOST;
+ CLUSTER_PRINCIPAL = userPrincipal;
+ FULL_USER_PRINCIPAL = userPrincipal + '@' + KDC.getRealm();
+ HTTP_PRINCIPAL = "HTTP/" + LOCALHOST;
+ KDC.createPrincipal(KEYTAB_FILE, CLUSTER_PRINCIPAL, HTTP_PRINCIPAL);
+ }
+
+ private static void setupCluster(HBaseTestingUtility util) throws Exception {
+ Configuration conf = util.getConfiguration();
+
+ SecureTestUtil.enableSecurity(conf);
+ VisibilityTestUtil.enableVisiblityLabels(conf);
+ SecureTestUtil.verifyConfiguration(conf);
+
+ conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ AccessController.class.getName() + ',' + TokenProvider.class.getName());
+
+ HBaseKerberosUtils.setSecuredConfiguration(conf,
+ CLUSTER_PRINCIPAL + '@' + KDC.getRealm(), HTTP_PRINCIPAL + '@' + KDC.getRealm());
+
+ util.startMiniCluster();
+ }
+
+ /**
+ * Sets the security firstly for getting the correct default realm.
+ */
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ setUpKdcServer();
+ setupCluster(UTIL1);
+ setupCluster(UTIL2);
+
+ try (Admin admin = UTIL1.getAdmin()) {
+ admin.addReplicationPeer("1", ReplicationPeerConfig.newBuilder()
+ .setClusterKey(ZKConfig.getZooKeeperClusterKey(UTIL2.getConfiguration()))
+ .putConfiguration(HBaseKerberosUtils.KRB_PRINCIPAL,
+ UTIL2.getConfiguration().get(HBaseKerberosUtils.KRB_PRINCIPAL))
+ .putConfiguration(HBaseKerberosUtils.MASTER_KRB_PRINCIPAL,
+ UTIL2.getConfiguration().get(HBaseKerberosUtils.MASTER_KRB_PRINCIPAL))
+ .build());
+ }
+ }
+
+ @AfterClass
+ public static void cleanup() throws IOException {
+ UTIL1.shutdownMiniCluster();
+ UTIL2.shutdownMiniCluster();
+ }
+
+ @Parameters
+ public static Collection> peer() {
+ return Arrays.asList(
+ () -> "1",
+ () -> ZKConfig.getZooKeeperClusterKey(UTIL2.getConfiguration())
+ );
+ }
+
+ @Parameter
+ public Supplier peer;
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testJobCredentials() throws Exception {
+ Job job = new VerifyReplication().createSubmittableJob(
+ new Configuration(UTIL1.getConfiguration()),
+ new String[] {
+ peer.get(),
+ "table"
+ });
+
+ Credentials credentials = job.getCredentials();
+ Collection> tokens = credentials.getAllTokens();
+ assertEquals(2, tokens.size());
+
+ String clusterId1 = ZKClusterId.readClusterIdZNode(UTIL1.getZooKeeperWatcher());
+ Token tokenForCluster1 =
+ (Token) credentials.getToken(new Text(clusterId1));
+ assertEquals(FULL_USER_PRINCIPAL, tokenForCluster1.decodeIdentifier().getUsername());
+
+ String clusterId2 = ZKClusterId.readClusterIdZNode(UTIL2.getZooKeeperWatcher());
+ Token tokenForCluster2 =
+ (Token) credentials.getToken(new Text(clusterId2));
+ assertEquals(FULL_USER_PRINCIPAL, tokenForCluster2.decodeIdentifier().getUsername());
+ }
+}