HBASE-7963 HBase VerifyReplication not working when security enabled
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1564069 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d4355b91d5
commit
5b918a16fd
|
@ -83,7 +83,9 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
|
||||||
@Override
|
@Override
|
||||||
public void init() throws ReplicationException {
|
public void init() throws ReplicationException {
|
||||||
try {
|
try {
|
||||||
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
|
if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
|
||||||
|
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
|
||||||
|
}
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
throw new ReplicationException("Could not initialize replication peers", e);
|
throw new ReplicationException("Could not initialize replication peers", e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -395,6 +395,32 @@ public class TableMapReduceUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Obtain an authentication token, for the specified cluster, on behalf of the current user
|
||||||
|
* and add it to the credentials for the given map reduce job.
|
||||||
|
*
|
||||||
|
* The quorumAddress is the key to the ZK ensemble, which contains:
|
||||||
|
* hbase.zookeeper.quorum, hbase.zookeeper.client.port and zookeeper.znode.parent
|
||||||
|
*
|
||||||
|
* @param job The job that requires the permission.
|
||||||
|
* @param quorumAddress string that contains the 3 required configuratins
|
||||||
|
* @throws IOException When the authentication token cannot be obtained.
|
||||||
|
*/
|
||||||
|
public static void initCredentialsForCluster(Job job, String quorumAddress)
|
||||||
|
throws IOException {
|
||||||
|
UserProvider userProvider = UserProvider.instantiate(job.getConfiguration());
|
||||||
|
if (userProvider.isHBaseSecurityEnabled()) {
|
||||||
|
try {
|
||||||
|
Configuration peerConf = HBaseConfiguration.create(job.getConfiguration());
|
||||||
|
ZKUtil.applyClusterKeyToConf(peerConf, quorumAddress);
|
||||||
|
obtainAuthTokenForJob(job, peerConf, userProvider.getCurrent());
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.info("Interrupted obtaining user authentication token");
|
||||||
|
Thread.interrupted();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static void obtainAuthTokenForJob(Job job, Configuration conf, User user)
|
private static void obtainAuthTokenForJob(Job job, Configuration conf, User user)
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
Token<AuthenticationTokenIdentifier> authToken = getAuthToken(conf, user);
|
Token<AuthenticationTokenIdentifier> authToken = getAuthToken(conf, user);
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
import org.apache.hadoop.mapreduce.Job;
|
import org.apache.hadoop.mapreduce.Job;
|
||||||
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
|
||||||
|
|
||||||
|
@ -106,35 +107,13 @@ public class VerifyReplication {
|
||||||
HConnectionManager.execute(new HConnectable<Void>(conf) {
|
HConnectionManager.execute(new HConnectable<Void>(conf) {
|
||||||
@Override
|
@Override
|
||||||
public Void connect(HConnection conn) throws IOException {
|
public Void connect(HConnection conn) throws IOException {
|
||||||
ZooKeeperWatcher localZKW = null;
|
String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
|
||||||
ReplicationPeer peer = null;
|
Configuration peerConf = HBaseConfiguration.create(conf);
|
||||||
try {
|
ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);
|
||||||
localZKW = new ZooKeeperWatcher(
|
|
||||||
conf, "VerifyReplication", new Abortable() {
|
HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName"));
|
||||||
@Override public void abort(String why, Throwable e) {}
|
scan.setStartRow(value.getRow());
|
||||||
@Override public boolean isAborted() {return false;}
|
replicatedScanner = replicatedTable.getScanner(scan);
|
||||||
});
|
|
||||||
ReplicationPeers rp =
|
|
||||||
ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
|
|
||||||
rp.init();
|
|
||||||
Configuration peerConf = rp.getPeerConf(peerId);
|
|
||||||
if (peerConf == null) {
|
|
||||||
throw new IOException("Couldn't get peer conf!");
|
|
||||||
}
|
|
||||||
HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName"));
|
|
||||||
scan.setStartRow(value.getRow());
|
|
||||||
replicatedScanner = replicatedTable.getScanner(scan);
|
|
||||||
} catch (ReplicationException e) {
|
|
||||||
throw new IOException(
|
|
||||||
"An error occured while trying to connect to the remove peer cluster", e);
|
|
||||||
} finally {
|
|
||||||
if (peer != null) {
|
|
||||||
peer.close();
|
|
||||||
}
|
|
||||||
if (localZKW != null) {
|
|
||||||
localZKW.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -157,6 +136,38 @@ public class VerifyReplication {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String getPeerQuorumAddress(final Configuration conf) throws IOException {
|
||||||
|
ZooKeeperWatcher localZKW = null;
|
||||||
|
ReplicationPeer peer = null;
|
||||||
|
try {
|
||||||
|
localZKW = new ZooKeeperWatcher(conf, "VerifyReplication",
|
||||||
|
new Abortable() {
|
||||||
|
@Override public void abort(String why, Throwable e) {}
|
||||||
|
@Override public boolean isAborted() {return false;}
|
||||||
|
});
|
||||||
|
|
||||||
|
ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf, localZKW);
|
||||||
|
rp.init();
|
||||||
|
|
||||||
|
Configuration peerConf = rp.getPeerConf(peerId);
|
||||||
|
if (peerConf == null) {
|
||||||
|
throw new IOException("Couldn't get peer conf!");
|
||||||
|
}
|
||||||
|
|
||||||
|
return ZKUtil.getZooKeeperClusterKey(peerConf);
|
||||||
|
} catch (ReplicationException e) {
|
||||||
|
throw new IOException(
|
||||||
|
"An error occured while trying to connect to the remove peer cluster", e);
|
||||||
|
} finally {
|
||||||
|
if (peer != null) {
|
||||||
|
peer.close();
|
||||||
|
}
|
||||||
|
if (localZKW != null) {
|
||||||
|
localZKW.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets up the actual job.
|
* Sets up the actual job.
|
||||||
*
|
*
|
||||||
|
@ -181,6 +192,11 @@ public class VerifyReplication {
|
||||||
if (families != null) {
|
if (families != null) {
|
||||||
conf.set(NAME+".families", families);
|
conf.set(NAME+".families", families);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String peerQuorumAddress = getPeerQuorumAddress(conf);
|
||||||
|
conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
|
||||||
|
LOG.info("Peer Quorum Address: " + peerQuorumAddress);
|
||||||
|
|
||||||
Job job = new Job(conf, NAME + "_" + tableName);
|
Job job = new Job(conf, NAME + "_" + tableName);
|
||||||
job.setJarByClass(VerifyReplication.class);
|
job.setJarByClass(VerifyReplication.class);
|
||||||
|
|
||||||
|
@ -194,6 +210,10 @@ public class VerifyReplication {
|
||||||
}
|
}
|
||||||
TableMapReduceUtil.initTableMapperJob(tableName, scan,
|
TableMapReduceUtil.initTableMapperJob(tableName, scan,
|
||||||
Verifier.class, null, null, job);
|
Verifier.class, null, null, job);
|
||||||
|
|
||||||
|
// Obtain the auth token from peer cluster
|
||||||
|
TableMapReduceUtil.initCredentialsForCluster(job, peerQuorumAddress);
|
||||||
|
|
||||||
job.setOutputFormatClass(NullOutputFormat.class);
|
job.setOutputFormatClass(NullOutputFormat.class);
|
||||||
job.setNumReduceTasks(0);
|
job.setNumReduceTasks(0);
|
||||||
return job;
|
return job;
|
||||||
|
|
Loading…
Reference in New Issue