HBASE-26204 Obtain credential for VerifyReplication with peerQuorumAddress (#3591)
Signed-off-by: Rushabh Shah <shahrs87@gmail.com> Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
This commit is contained in:
parent
6ed03d98ef
commit
d781113a08
hbase-mapreduce
pom.xml
src
main/java/org/apache/hadoop/hbase/mapreduce/replication
test/java/org/apache/hadoop/hbase/replication
|
@ -226,6 +226,11 @@
|
|||
<type>test-jar</type>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-minikdc</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-core</artifactId>
|
||||
|
|
|
@ -270,7 +270,8 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
if (!sourceResult.isEmpty()) {
|
||||
context.getCounter(Counters.GOODROWS).increment(1);
|
||||
if (verbose) {
|
||||
LOG.info("Good row key (with recompare): " + delimiter + Bytes.toStringBinary(row.getRow())
|
||||
LOG.info("Good row key (with recompare): " + delimiter +
|
||||
Bytes.toStringBinary(row.getRow())
|
||||
+ delimiter);
|
||||
}
|
||||
}
|
||||
|
@ -480,12 +481,16 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
|
||||
}
|
||||
|
||||
Configuration peerClusterConf;
|
||||
if (peerId != null) {
|
||||
assert peerConfigPair != null;
|
||||
Configuration peerClusterConf = peerConfigPair.getSecond();
|
||||
// Obtain the auth token from peer cluster
|
||||
TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
|
||||
peerClusterConf = peerConfigPair.getSecond();
|
||||
} else {
|
||||
peerClusterConf = HBaseConfiguration.createClusterConf(conf,
|
||||
peerQuorumAddress, PEER_CONFIG_PREFIX);
|
||||
}
|
||||
// Obtain the auth token from peer cluster
|
||||
TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
|
||||
|
||||
job.setOutputFormatClass(NullOutputFormat.class);
|
||||
job.setNumReduceTasks(0);
|
||||
|
@ -668,7 +673,8 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
// This is to avoid making recompare calls to source/peer tables when snapshots are used
|
||||
if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) {
|
||||
printUsage(
|
||||
"Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable");
|
||||
"Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are"
|
||||
+ " immutable");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -708,8 +714,8 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
System.err.println(" without endtime means from starttime to forever");
|
||||
System.err.println(" endtime end of the time range");
|
||||
System.err.println(" versions number of cell versions to verify");
|
||||
System.err.println(" batch batch count for scan, " +
|
||||
"note that result row counts will no longer be actual number of rows when you use this option");
|
||||
System.err.println(" batch batch count for scan, note that"
|
||||
+ " result row counts will no longer be actual number of rows when you use this option");
|
||||
System.err.println(" raw includes raw scan if given in options");
|
||||
System.err.println(" families comma-separated list of families to copy");
|
||||
System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
|
||||
|
@ -726,16 +732,64 @@ public class VerifyReplication extends Configured implements Tool {
|
|||
System.err.println(" peerHBaseRootAddress Peer cluster HBase root location");
|
||||
System.err.println();
|
||||
System.err.println("Args:");
|
||||
System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");
|
||||
System.err.println(" peerid Id of the peer used for verification,"
|
||||
+ " must match the one given for replication");
|
||||
System.err.println(" peerQuorumAddress quorumAdress of the peer used for verification. The "
|
||||
+ "format is zk_quorum:zk_port:zk_hbase_path");
|
||||
System.err.println(" tablename Name of the table to verify");
|
||||
System.err.println();
|
||||
System.err.println("Examples:");
|
||||
System.err.println(" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
|
||||
System.err.println(
|
||||
" To verify the data replicated from TestTable for a 1 hour window with peer #5 ");
|
||||
System.err.println(" $ hbase " +
|
||||
"org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication" +
|
||||
" --starttime=1265875194289 --endtime=1265878794289 5 TestTable ");
|
||||
System.err.println();
|
||||
System.err.println(
|
||||
" To verify the data in TestTable between the cluster runs VerifyReplication and cluster-b");
|
||||
System.err.println(" Assume quorum address for cluster-b is"
|
||||
+ " cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:2181:/cluster-b");
|
||||
System.err.println(
|
||||
" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" +
|
||||
" cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
|
||||
+ "2181:/cluster-b \\\n" +
|
||||
" TestTable");
|
||||
System.err.println();
|
||||
System.err.println(
|
||||
" To verify the data in TestTable between the secured cluster runs VerifyReplication"
|
||||
+ " and insecure cluster-b");
|
||||
System.err.println(
|
||||
" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" +
|
||||
" -D verifyrep.peer.hbase.security.authentication=simple \\\n" +
|
||||
" cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
|
||||
+ "2181:/cluster-b \\\n" +
|
||||
" TestTable");
|
||||
System.err.println();
|
||||
System.err.println(" To verify the data in TestTable between" +
|
||||
" the secured cluster runs VerifyReplication and secured cluster-b");
|
||||
System.err.println(" Assume cluster-b uses different kerberos principal, cluster-b/_HOST@E" +
|
||||
", for master and regionserver kerberos principal from another cluster");
|
||||
System.err.println(
|
||||
" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" +
|
||||
" -D verifyrep.peer.hbase.regionserver.kerberos.principal="
|
||||
+ "cluster-b/_HOST@EXAMPLE.COM \\\n" +
|
||||
" -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n" +
|
||||
" cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
|
||||
+ "2181:/cluster-b \\\n" +
|
||||
" TestTable");
|
||||
System.err.println();
|
||||
System.err.println(
|
||||
" To verify the data in TestTable between the insecure cluster runs VerifyReplication"
|
||||
+ " and secured cluster-b");
|
||||
System.err.println(
|
||||
" $ hbase org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication \\\n" +
|
||||
" -D verifyrep.peer.hbase.security.authentication=kerberos \\\n" +
|
||||
" -D verifyrep.peer.hbase.regionserver.kerberos.principal="
|
||||
+ "cluster-b/_HOST@EXAMPLE.COM \\\n" +
|
||||
" -D verifyrep.peer.hbase.master.kerberos.principal=cluster-b/_HOST@EXAMPLE.COM \\\n" +
|
||||
" cluster-b-1.example.com,cluster-b-2.example.com,cluster-b-3.example.com:"
|
||||
+ "2181:/cluster-b \\\n" +
|
||||
" TestTable");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,166 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.function.Supplier;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtil;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
|
||||
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
|
||||
import org.apache.hadoop.hbase.security.access.AccessController;
|
||||
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
|
||||
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
|
||||
import org.apache.hadoop.hbase.security.token.TokenProvider;
|
||||
import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.minikdc.MiniKdc;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameter;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
|
||||
@Category({ ReplicationTests.class, LargeTests.class })
|
||||
@RunWith(Parameterized.class)
|
||||
public class TestVerifyReplicationSecureClusterCredentials {
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestVerifyReplicationSecureClusterCredentials.class);
|
||||
|
||||
private static MiniKdc KDC;
|
||||
private static final HBaseTestingUtil UTIL1 = new HBaseTestingUtil();
|
||||
private static final HBaseTestingUtil UTIL2 = new HBaseTestingUtil();
|
||||
|
||||
private static final File KEYTAB_FILE =
|
||||
new File(UTIL1.getDataTestDir("keytab").toUri().getPath());
|
||||
|
||||
private static final String LOCALHOST = "localhost";
|
||||
private static String CLUSTER_PRINCIPAL;
|
||||
private static String FULL_USER_PRINCIPAL;
|
||||
private static String HTTP_PRINCIPAL;
|
||||
|
||||
private static void setUpKdcServer() throws Exception {
|
||||
KDC = UTIL1.setupMiniKdc(KEYTAB_FILE);
|
||||
String username = UserGroupInformation.getLoginUser().getShortUserName();
|
||||
String userPrincipal = username + '/' + LOCALHOST;
|
||||
CLUSTER_PRINCIPAL = userPrincipal;
|
||||
FULL_USER_PRINCIPAL = userPrincipal + '@' + KDC.getRealm();
|
||||
HTTP_PRINCIPAL = "HTTP/" + LOCALHOST;
|
||||
KDC.createPrincipal(KEYTAB_FILE, CLUSTER_PRINCIPAL, HTTP_PRINCIPAL);
|
||||
}
|
||||
|
||||
private static void setupCluster(HBaseTestingUtil util) throws Exception {
|
||||
Configuration conf = util.getConfiguration();
|
||||
|
||||
SecureTestUtil.enableSecurity(conf);
|
||||
VisibilityTestUtil.enableVisiblityLabels(conf);
|
||||
SecureTestUtil.verifyConfiguration(conf);
|
||||
|
||||
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
|
||||
AccessController.class.getName() + ',' + TokenProvider.class.getName());
|
||||
|
||||
HBaseKerberosUtils.setSecuredConfiguration(conf,
|
||||
CLUSTER_PRINCIPAL + '@' + KDC.getRealm(), HTTP_PRINCIPAL + '@' + KDC.getRealm());
|
||||
|
||||
util.startMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the security firstly for getting the correct default realm.
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void beforeClass() throws Exception {
|
||||
setUpKdcServer();
|
||||
setupCluster(UTIL1);
|
||||
setupCluster(UTIL2);
|
||||
|
||||
try (Admin admin = UTIL1.getAdmin()) {
|
||||
admin.addReplicationPeer("1", ReplicationPeerConfig.newBuilder()
|
||||
.setClusterKey(ZKConfig.getZooKeeperClusterKey(UTIL2.getConfiguration()))
|
||||
.putConfiguration(HBaseKerberosUtils.KRB_PRINCIPAL,
|
||||
UTIL2.getConfiguration().get(HBaseKerberosUtils.KRB_PRINCIPAL))
|
||||
.putConfiguration(HBaseKerberosUtils.MASTER_KRB_PRINCIPAL,
|
||||
UTIL2.getConfiguration().get(HBaseKerberosUtils.MASTER_KRB_PRINCIPAL))
|
||||
.build());
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void cleanup() throws IOException {
|
||||
UTIL1.shutdownMiniCluster();
|
||||
UTIL2.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
@Parameters
|
||||
public static Collection<Supplier<String>> peer() {
|
||||
return Arrays.asList(
|
||||
() -> "1",
|
||||
() -> ZKConfig.getZooKeeperClusterKey(UTIL2.getConfiguration())
|
||||
);
|
||||
}
|
||||
|
||||
@Parameter
|
||||
public Supplier<String> peer;
|
||||
|
||||
@Test
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testJobCredentials() throws Exception {
|
||||
Job job = new VerifyReplication().createSubmittableJob(
|
||||
new Configuration(UTIL1.getConfiguration()),
|
||||
new String[] {
|
||||
peer.get(),
|
||||
"table"
|
||||
});
|
||||
|
||||
Credentials credentials = job.getCredentials();
|
||||
Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
|
||||
assertEquals(2, tokens.size());
|
||||
|
||||
String clusterId1 = ZKClusterId.readClusterIdZNode(UTIL1.getZooKeeperWatcher());
|
||||
Token<AuthenticationTokenIdentifier> tokenForCluster1 =
|
||||
(Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId1));
|
||||
assertEquals(FULL_USER_PRINCIPAL, tokenForCluster1.decodeIdentifier().getUsername());
|
||||
|
||||
String clusterId2 = ZKClusterId.readClusterIdZNode(UTIL2.getZooKeeperWatcher());
|
||||
Token<AuthenticationTokenIdentifier> tokenForCluster2 =
|
||||
(Token<AuthenticationTokenIdentifier>) credentials.getToken(new Text(clusterId2));
|
||||
assertEquals(FULL_USER_PRINCIPAL, tokenForCluster2.decodeIdentifier().getUsername());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue