HBASE-17389 Convert all internal usages from ReplicationAdmin to Admin

This commit is contained in:
Guanghao Zhang 2017-02-07 10:18:59 +08:00
parent af9d359b8e
commit 9a78d00884
15 changed files with 305 additions and 177 deletions

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
@ -49,6 +50,7 @@ import org.apache.hadoop.hbase.quotas.QuotaFilter;
import org.apache.hadoop.hbase.quotas.QuotaRetriever;
import org.apache.hadoop.hbase.quotas.QuotaSettings;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.snapshot.HBaseSnapshotException;
@ -1906,6 +1908,30 @@ public interface Admin extends Abortable, Closeable {
final ReplicationPeerConfig peerConfig) throws IOException {
}
/**
* Append the replicable table-cf config of the specified peer
* @param id a short that identifies the cluster
* @param tableCfs A map from tableName to column family names
* @throws ReplicationException
* @throws IOException
*/
default void appendReplicationPeerTableCFs(String id,
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException,
IOException {
}
/**
* Remove some table-cfs from config of the specified peer
* @param id a short name that identifies the cluster
* @param tableCfs A map from tableName to column family names
* @throws ReplicationException
* @throws IOException
*/
default void removeReplicationPeerTableCFs(String id,
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException,
IOException {
}
/**
* Return a list of replication peers.
* @return a list of replication peers description

View File

@ -23,10 +23,13 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
@ -81,6 +84,7 @@ import org.apache.hadoop.hbase.quotas.QuotaFilter;
import org.apache.hadoop.hbase.quotas.QuotaRetriever;
import org.apache.hadoop.hbase.quotas.QuotaSettings;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
@ -197,6 +201,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
@ -3871,6 +3876,84 @@ public class HBaseAdmin implements Admin {
});
}
@Override
public void appendReplicationPeerTableCFs(String id,
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException,
IOException {
if (tableCfs == null) {
throw new ReplicationException("tableCfs is null");
}
ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
if (preTableCfs == null) {
peerConfig.setTableCFsMap(tableCfs);
} else {
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
TableName table = entry.getKey();
Collection<String> appendCfs = entry.getValue();
if (preTableCfs.containsKey(table)) {
List<String> cfs = preTableCfs.get(table);
if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
preTableCfs.put(table, null);
} else {
Set<String> cfSet = new HashSet<String>(cfs);
cfSet.addAll(appendCfs);
preTableCfs.put(table, Lists.newArrayList(cfSet));
}
} else {
if (appendCfs == null || appendCfs.isEmpty()) {
preTableCfs.put(table, null);
} else {
preTableCfs.put(table, Lists.newArrayList(appendCfs));
}
}
}
}
updateReplicationPeerConfig(id, peerConfig);
}
@Override
public void removeReplicationPeerTableCFs(String id,
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException,
IOException {
if (tableCfs == null) {
throw new ReplicationException("tableCfs is null");
}
ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
if (preTableCfs == null) {
throw new ReplicationException("Table-Cfs for peer" + id + " is null");
}
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
TableName table = entry.getKey();
Collection<String> removeCfs = entry.getValue();
if (preTableCfs.containsKey(table)) {
List<String> cfs = preTableCfs.get(table);
if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
preTableCfs.remove(table);
} else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
Set<String> cfSet = new HashSet<String>(cfs);
cfSet.removeAll(removeCfs);
if (cfSet.isEmpty()) {
preTableCfs.remove(table);
} else {
preTableCfs.put(table, Lists.newArrayList(cfSet));
}
} else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
throw new ReplicationException("Cannot remove cf of table: " + table
+ " which doesn't specify cfs from table-cfs config in peer: " + id);
} else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
throw new ReplicationException("Cannot remove table: " + table
+ " which has specified cfs from table-cfs config in peer: " + id);
}
} else {
throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
}
}
updateReplicationPeerConfig(id, peerConfig);
}
@Override
public List<ReplicationPeerDescription> listReplicationPeers() throws IOException {
return listReplicationPeers((Pattern)null);

View File

@ -194,7 +194,11 @@ public class ReplicationAdmin implements Closeable {
* Add a new remote slave cluster for replication.
* @param id a short name that identifies the cluster
* @param peerConfig configuration for the replication slave cluster
* @deprecated use
* {@link org.apache.hadoop.hbase.client.Admin#addReplicationPeer(String, ReplicationPeerConfig)}
* instead
*/
@Deprecated
public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException,
IOException {
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
@ -210,6 +214,12 @@ public class ReplicationAdmin implements Closeable {
return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig);
}
/**
* @deprecated use
* {@link org.apache.hadoop.hbase.client.Admin#updateReplicationPeerConfig(String, ReplicationPeerConfig)}
* instead
*/
@Deprecated
public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws IOException {
this.admin.updateReplicationPeerConfig(id, peerConfig);
}
@ -217,7 +227,9 @@ public class ReplicationAdmin implements Closeable {
/**
* Removes a peer cluster and stops the replication to it.
* @param id a short name that identifies the cluster
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin#removeReplicationPeer(String)} instead
*/
@Deprecated
public void removePeer(String id) throws IOException {
this.admin.removeReplicationPeer(id);
}
@ -225,7 +237,10 @@ public class ReplicationAdmin implements Closeable {
/**
* Restart the replication stream to the specified peer.
* @param id a short name that identifies the cluster
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin#enableReplicationPeer(String)}
* instead
*/
@Deprecated
public void enablePeer(String id) throws IOException {
this.admin.enableReplicationPeer(id);
}
@ -233,7 +248,10 @@ public class ReplicationAdmin implements Closeable {
/**
* Stop the replication stream to the specified peer.
* @param id a short name that identifies the cluster
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin#disableReplicationPeer(String)}
* instead
*/
@Deprecated
public void disablePeer(String id) throws IOException {
this.admin.disableReplicationPeer(id);
}
@ -242,11 +260,17 @@ public class ReplicationAdmin implements Closeable {
* Get the number of slave clusters the local cluster has.
* @return number of slave clusters
* @throws IOException
* @deprecated
*/
@Deprecated
public int getPeersCount() throws IOException {
return this.admin.listReplicationPeers().size();
}
/**
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin#listReplicationPeers()} instead
*/
@Deprecated
public Map<String, ReplicationPeerConfig> listPeerConfigs() throws IOException {
List<ReplicationPeerDescription> peers = this.admin.listReplicationPeers();
Map<String, ReplicationPeerConfig> result = new TreeMap<String, ReplicationPeerConfig>();
@ -256,6 +280,11 @@ public class ReplicationAdmin implements Closeable {
return result;
}
/**
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin#getReplicationPeerConfig(String)}
* instead
*/
@Deprecated
public ReplicationPeerConfig getPeerConfig(String id) throws IOException {
return admin.getReplicationPeerConfig(id);
}
@ -294,6 +323,7 @@ public class ReplicationAdmin implements Closeable {
* @throws ReplicationException
* @throws IOException
*/
@Deprecated
public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
throws ReplicationException, IOException {
if (tableCfs == null) {
@ -350,6 +380,7 @@ public class ReplicationAdmin implements Closeable {
* @throws ReplicationException
* @throws IOException
*/
@Deprecated
public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
throws ReplicationException, IOException {
if (tableCfs == null) {
@ -398,6 +429,7 @@ public class ReplicationAdmin implements Closeable {
* to indicate replicating all column families. Pass null for replicating all table and column
* families
*/
@Deprecated
public void setPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
throws IOException {
ReplicationPeerConfig peerConfig = getPeerConfig(id);
@ -411,6 +443,7 @@ public class ReplicationAdmin implements Closeable {
* an IllegalArgumentException is thrown if it doesn't exist
* @return true if replication is enabled to that peer, false if it isn't
*/
@Deprecated
public boolean getPeerState(String id) throws ReplicationException, IOException {
List<ReplicationPeerDescription> peers = admin.listReplicationPeers(id);
if (peers.isEmpty() || !id.equals(peers.get(0).getPeerId())) {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client.replication;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@ -46,4 +47,15 @@ public class TableCFs {
public Map<String, Integer> getColumnFamilyMap() {
return this.cfs;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(table.getNameAsString());
if (!cfs.isEmpty()) {
sb.append(":");
sb.append(StringUtils.join(cfs.keySet(), ','));
}
return sb.toString();
}
}

View File

@ -3177,6 +3177,8 @@ public class HMaster extends HRegionServer implements MasterServices {
cpHost.preGetReplicationPeerConfig(peerId);
}
final ReplicationPeerConfig peerConfig = this.replicationManager.getPeerConfig(peerId);
LOG.info(getClientIdAuditPrefix() + " get replication peer config, id=" + peerId + ", config="
+ peerConfig);
if (cpHost != null) {
cpHost.postGetReplicationPeerConfig(peerId);
}

View File

@ -35,11 +35,11 @@ import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.util.Bytes;
/**
@ -50,14 +50,14 @@ public class ReplicationMetaCleaner extends ScheduledChore {
private static final Log LOG = LogFactory.getLog(ReplicationMetaCleaner.class);
private ReplicationAdmin replicationAdmin;
private MasterServices master;
private final Admin admin;
private final MasterServices master;
public ReplicationMetaCleaner(MasterServices master, Stoppable stoppable, int period)
throws IOException {
super("ReplicationMetaCleaner", stoppable, period);
this.master = master;
replicationAdmin = new ReplicationAdmin(master.getConfiguration());
admin = master.getConnection().getAdmin();
}
@Override
@ -81,12 +81,12 @@ public class ReplicationMetaCleaner extends ScheduledChore {
return;
}
Map<String, ReplicationPeerConfig> peers = replicationAdmin.listPeerConfigs();
for (Map.Entry<String, ReplicationPeerConfig> entry : peers.entrySet()) {
for (Map.Entry<TableName, List<String>> map : entry.getValue().getTableCFsMap()
List<ReplicationPeerDescription> peers = admin.listReplicationPeers();
for (ReplicationPeerDescription peerDesc : peers) {
for (Map.Entry<TableName, List<String>> map : peerDesc.getPeerConfig().getTableCFsMap()
.entrySet()) {
if (serialTables.containsKey(map.getKey().getNameAsString())) {
serialTables.get(map.getKey().getNameAsString()).add(entry.getKey());
serialTables.get(map.getKey().getNameAsString()).add(peerDesc.getPeerId());
break;
}
}

View File

@ -21,12 +21,12 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -37,15 +37,16 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.client.replication.TableCFs;
import org.apache.hadoop.hbase.io.WALLink;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
@ -207,8 +208,8 @@ public class DumpReplicationQueues extends Configured implements Tool {
Configuration conf = getConf();
HBaseAdmin.available(conf);
ReplicationAdmin replicationAdmin = new ReplicationAdmin(conf);
ClusterConnection connection = (ClusterConnection) ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "DumpReplicationQueues" + System.currentTimeMillis(),
new WarnOnlyAbortable(), true);
@ -216,26 +217,28 @@ public class DumpReplicationQueues extends Configured implements Tool {
try {
// Our zk watcher
LOG.info("Our Quorum: " + zkw.getQuorum());
List<HashMap<String, String>> replicatedTables = replicationAdmin.listReplicated();
if (replicatedTables.isEmpty()) {
List<TableCFs> replicatedTableCFs = admin.listReplicatedTableCFs();
if (replicatedTableCFs.isEmpty()) {
LOG.info("No tables with a configured replication peer were found.");
return(0);
} else {
LOG.info("Replicated Tables: " + replicatedTables);
LOG.info("Replicated Tables: " + replicatedTableCFs);
}
Map<String, ReplicationPeerConfig> peerConfigs = replicationAdmin.listPeerConfigs();
List<ReplicationPeerDescription> peers = admin.listReplicationPeers();
if (peerConfigs.isEmpty()) {
if (peers.isEmpty()) {
LOG.info("Replication is enabled but no peer configuration was found.");
}
System.out.println("Dumping replication peers and configurations:");
System.out.println(dumpPeersState(replicationAdmin, peerConfigs));
System.out.println(dumpPeersState(peers));
if (opts.isDistributed()) {
LOG.info("Found [--distributed], will poll each RegionServer.");
System.out.println(dumpQueues(connection, zkw, peerConfigs.keySet(), opts.isHdfs()));
Set<String> peerIds = peers.stream().map((peer) -> peer.getPeerId())
.collect(Collectors.toSet());
System.out.println(dumpQueues(connection, zkw, peerIds, opts.isHdfs()));
System.out.println(dumpReplicationSummary());
} else {
// use ZK instead
@ -279,28 +282,22 @@ public class DumpReplicationQueues extends Configured implements Tool {
return sb.toString();
}
public String dumpPeersState(ReplicationAdmin replicationAdmin,
Map<String, ReplicationPeerConfig> peerConfigs) throws Exception {
public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exception {
Map<String, String> currentConf;
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, ReplicationPeerConfig> peer : peerConfigs.entrySet()) {
try {
ReplicationPeerConfig peerConfig = peer.getValue();
sb.append("Peer: " + peer.getKey() + "\n");
sb.append(" " + "State: "
+ (replicationAdmin.getPeerState(peer.getKey()) ? "ENABLED" : "DISABLED") + "\n");
sb.append(" " + "Cluster Name: " + peerConfig.getClusterKey() + "\n");
sb.append(" " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n");
currentConf = peerConfig.getConfiguration();
// Only show when we have a custom configuration for the peer
if (currentConf.size() > 1) {
sb.append(" " + "Peer Configuration: " + currentConf + "\n");
}
sb.append(" " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n");
sb.append(" " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n");
} catch (ReplicationException re) {
sb.append("Got an exception while invoking ReplicationAdmin: " + re + "\n");
for (ReplicationPeerDescription peer : peers) {
ReplicationPeerConfig peerConfig = peer.getPeerConfig();
sb.append("Peer: " + peer.getPeerId() + "\n");
sb.append(" " + "State: " + (peer.isEnabled() ? "ENABLED" : "DISABLED") + "\n");
sb.append(" " + "Cluster Name: " + peerConfig.getClusterKey() + "\n");
sb.append(" " + "Replication Endpoint: " + peerConfig.getReplicationEndpointImpl() + "\n");
currentConf = peerConfig.getConfiguration();
// Only show when we have a custom configuration for the peer
if (currentConf.size() > 1) {
sb.append(" " + "Peer Configuration: " + currentConf + "\n");
}
sb.append(" " + "Peer Table CFs: " + peerConfig.getTableCFsMap() + "\n");
sb.append(" " + "Peer Namespaces: " + peerConfig.getNamespaces() + "\n");
}
return sb.toString();
}

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.io.HFileLink;
@ -148,10 +150,10 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
if (!isRegionReplicaReplicationEnabled(conf)) {
return;
}
ReplicationAdmin repAdmin = new ReplicationAdmin(conf);
Admin admin = ConnectionFactory.createConnection(conf).getAdmin();
ReplicationPeerConfig peerConfig = null;
try {
peerConfig = repAdmin.getPeerConfig(REGION_REPLICA_REPLICATION_PEER);
peerConfig = admin.getReplicationPeerConfig(REGION_REPLICA_REPLICATION_PEER);
} catch (ReplicationPeerNotFoundException e) {
LOG.warn("Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER
+ " not exist", e);
@ -163,12 +165,10 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
peerConfig = new ReplicationPeerConfig();
peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf));
peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName());
repAdmin.addPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, null);
admin.addReplicationPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig);
}
} catch (ReplicationException ex) {
throw new IOException(ex);
} finally {
repAdmin.close();
admin.close();
}
}

View File

@ -514,51 +514,34 @@ public class TestMasterReplication {
private void addPeer(String id, int masterClusterNumber,
int slaveClusterNumber) throws Exception {
ReplicationAdmin replicationAdmin = null;
try {
replicationAdmin = new ReplicationAdmin(
configurations[masterClusterNumber]);
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utilities[slaveClusterNumber].getClusterKey());
replicationAdmin.addPeer(id, rpc, null);
} finally {
close(replicationAdmin);
try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber])
.getAdmin()) {
admin.addReplicationPeer(id,
new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey()));
}
}
private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
throws Exception {
ReplicationAdmin replicationAdmin = null;
try {
replicationAdmin = new ReplicationAdmin(configurations[masterClusterNumber]);
ReplicationPeerConfig replicationPeerConfig = new ReplicationPeerConfig();
replicationPeerConfig.setClusterKey(utilities[slaveClusterNumber].getClusterKey());
replicationAdmin.addPeer(id, replicationPeerConfig,
ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs));
} finally {
close(replicationAdmin);
try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber])
.getAdmin()) {
admin.addReplicationPeer(id,
new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())
.setTableCFsMap(ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs)));
}
}
private void disablePeer(String id, int masterClusterNumber) throws Exception {
ReplicationAdmin replicationAdmin = null;
try {
replicationAdmin = new ReplicationAdmin(
configurations[masterClusterNumber]);
replicationAdmin.disablePeer(id);
} finally {
close(replicationAdmin);
try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber])
.getAdmin()) {
admin.disableReplicationPeer(id);
}
}
private void enablePeer(String id, int masterClusterNumber) throws Exception {
ReplicationAdmin replicationAdmin = null;
try {
replicationAdmin = new ReplicationAdmin(
configurations[masterClusterNumber]);
replicationAdmin.enablePeer(id);
} finally {
close(replicationAdmin);
try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber])
.getAdmin()) {
admin.enableReplicationPeer(id);
}
}

View File

@ -113,7 +113,7 @@ public class TestVisibilityLabelReplicationWithExpAsString extends TestVisibilit
TEST_UTIL.startMiniZKCluster();
MiniZooKeeperCluster miniZK = TEST_UTIL.getZkCluster();
zkw1 = new ZooKeeperWatcher(conf, "cluster1", null, true);
replicationAdmin = new ReplicationAdmin(conf);
admin = TEST_UTIL.getAdmin();
// Base conf2 on conf1 so it gets the right zk cluster.
conf1 = HBaseConfiguration.create(conf);
@ -136,7 +136,7 @@ public class TestVisibilityLabelReplicationWithExpAsString extends TestVisibilit
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(TEST_UTIL1.getClusterKey());
replicationAdmin.addPeer("2", rpc, null);
admin.addReplicationPeer("2", rpc);
HTableDescriptor table = new HTableDescriptor(TABLE_NAME);
HColumnDescriptor desc = new HColumnDescriptor(fam);

View File

@ -89,7 +89,7 @@ public class TestVisibilityLabelsReplication {
protected static Configuration conf;
protected static Configuration conf1;
protected static TableName TABLE_NAME = TableName.valueOf("TABLE_NAME");
protected static ReplicationAdmin replicationAdmin;
protected static Admin admin;
public static final String TOPSECRET = "topsecret";
public static final String PUBLIC = "public";
public static final String PRIVATE = "private";
@ -161,7 +161,7 @@ public class TestVisibilityLabelsReplication {
TEST_UTIL.startMiniZKCluster();
MiniZooKeeperCluster miniZK = TEST_UTIL.getZkCluster();
zkw1 = new ZooKeeperWatcher(conf, "cluster1", null, true);
replicationAdmin = new ReplicationAdmin(conf);
admin = TEST_UTIL.getAdmin();
// Base conf2 on conf1 so it gets the right zk cluster.
conf1 = HBaseConfiguration.create(conf);
@ -185,7 +185,7 @@ public class TestVisibilityLabelsReplication {
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(TEST_UTIL1.getClusterKey());
replicationAdmin.addPeer("2", rpc, null);
admin.addReplicationPeer("2", rpc);
Admin hBaseAdmin = TEST_UTIL.getAdmin();
HTableDescriptor table = new HTableDescriptor(TABLE_NAME);

View File

@ -20,6 +20,7 @@
include Java
java_import org.apache.hadoop.hbase.client.replication.ReplicationAdmin
java_import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper
java_import org.apache.hadoop.hbase.replication.ReplicationPeerConfig
java_import org.apache.hadoop.hbase.util.Bytes
java_import org.apache.hadoop.hbase.zookeeper.ZKConfig
@ -34,6 +35,7 @@ module Hbase
def initialize(configuration)
@replication_admin = ReplicationAdmin.new(configuration)
@configuration = configuration
@admin = ConnectionFactory.createConnection(configuration).getAdmin
end
#----------------------------------------------------------------------------------------------
@ -100,7 +102,7 @@ module Hbase
}
replication_peer_config.set_table_cfs_map(map)
end
@replication_admin.add_peer(id, replication_peer_config)
@admin.addReplicationPeer(id, replication_peer_config)
else
raise(ArgumentError, "args must be a Hash")
end
@ -109,46 +111,40 @@ module Hbase
#----------------------------------------------------------------------------------------------
# Remove a peer cluster, stops the replication
def remove_peer(id)
@replication_admin.removePeer(id)
@admin.removeReplicationPeer(id)
end
#---------------------------------------------------------------------------------------------
# Show replcated tables/column families, and their ReplicationType
def list_replicated_tables(regex = ".*")
pattern = java.util.regex.Pattern.compile(regex)
list = @replication_admin.listReplicated()
list.select {|s| pattern.match(s.get(ReplicationAdmin::TNAME))}
list = @admin.listReplicatedTableCFs()
list.select {|t| pattern.match(t.getTable().getNameAsString())}
end
#----------------------------------------------------------------------------------------------
# List all peer clusters
def list_peers
@replication_admin.listPeerConfigs
end
#----------------------------------------------------------------------------------------------
# Get peer cluster state
def get_peer_state(id)
@replication_admin.getPeerState(id) ? "ENABLED" : "DISABLED"
@admin.listReplicationPeers
end
#----------------------------------------------------------------------------------------------
# Restart the replication stream to the specified peer
def enable_peer(id)
@replication_admin.enablePeer(id)
@admin.enableReplicationPeer(id)
end
#----------------------------------------------------------------------------------------------
# Stop the replication stream to the specified peer
def disable_peer(id)
@replication_admin.disablePeer(id)
@admin.disableReplicationPeer(id)
end
#----------------------------------------------------------------------------------------------
# Show the current tableCFs config for the specified peer
def show_peer_tableCFs(id)
@replication_admin.getPeerTableCFs(id)
rpc = @admin.getReplicationPeerConfig(id)
ReplicationSerDeHelper.convertToString(rpc.getTableCFsMap())
end
#----------------------------------------------------------------------------------------------
@ -160,8 +156,12 @@ module Hbase
tableCFs.each{|key, val|
map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
}
rpc = get_peer_config(id)
unless rpc.nil?
rpc.setTableCFsMap(map)
@admin.updateReplicationPeerConfig(id, rpc)
end
end
@replication_admin.setPeerTableCFs(id, map)
end
#----------------------------------------------------------------------------------------------
@ -174,7 +174,7 @@ module Hbase
map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
}
end
@replication_admin.appendPeerTableCFs(id, map)
@admin.appendReplicationPeerTableCFs(id, map)
end
#----------------------------------------------------------------------------------------------
@ -187,7 +187,7 @@ module Hbase
map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
}
end
@replication_admin.removePeerTableCFs(id, map)
@admin.removeReplicationPeerTableCFs(id, map)
end
# Set new namespaces config for the specified peer
@ -200,7 +200,7 @@ module Hbase
rpc = get_peer_config(id)
unless rpc.nil?
rpc.setNamespaces(ns_set)
@replication_admin.updatePeerConfig(id, rpc)
@admin.updateReplicationPeerConfig(id, rpc)
end
end
end
@ -218,7 +218,7 @@ module Hbase
ns_set.add(n)
end
rpc.setNamespaces(ns_set)
@replication_admin.updatePeerConfig(id, rpc)
@admin.updateReplicationPeerConfig(id, rpc)
end
end
end
@ -235,7 +235,7 @@ module Hbase
end
end
rpc.setNamespaces(ns_set)
@replication_admin.updatePeerConfig(id, rpc)
@admin.updateReplicationPeerConfig(id, rpc)
end
end
end
@ -257,7 +257,7 @@ module Hbase
rpc = get_peer_config(id)
unless rpc.nil?
rpc.setBandwidth(bandwidth)
@replication_admin.updatePeerConfig(id, rpc)
@admin.updateReplicationPeerConfig(id, rpc)
end
end
@ -265,26 +265,27 @@ module Hbase
# Enables a table's replication switch
def enable_tablerep(table_name)
tableName = TableName.valueOf(table_name)
@replication_admin.enableTableRep(tableName)
@admin.enableTableReplication(tableName)
end
#----------------------------------------------------------------------------------------------
# Disables a table's replication switch
def disable_tablerep(table_name)
tableName = TableName.valueOf(table_name)
@replication_admin.disableTableRep(tableName)
@admin.disableTableReplication(tableName)
end
def list_peer_configs
@replication_admin.list_peer_configs
map = java.util.HashMap.new
peers = @admin.listReplicationPeers
peers.each do |peer|
map.put(peer.getPeerId, peer.getPeerConfig)
end
return map
end
def get_peer_config(id)
@replication_admin.get_peer_config(id)
end
def peer_added(id)
@replication_admin.peer_added(id)
@admin.getReplicationPeerConfig(id)
end
def update_peer_config(id, args={})
@ -306,7 +307,7 @@ module Hbase
}
end
@replication_admin.update_peer_config(id, replication_peer_config)
@admin.updateReplicationPeerConfig(id, replication_peer_config)
end
end
end

View File

@ -35,13 +35,15 @@ EOF
formatter.header(["PEER_ID", "CLUSTER_KEY", "ENDPOINT_CLASSNAME",
"STATE", "NAMESPACES", "TABLE_CFS", "BANDWIDTH"])
peers.entrySet().each do |e|
state = replication_admin.get_peer_state(e.key)
namespaces = replication_admin.show_peer_namespaces(e.value)
tableCFs = replication_admin.show_peer_tableCFs(e.key)
formatter.row([ e.key, e.value.getClusterKey,
e.value.getReplicationEndpointImpl, state, namespaces, tableCFs,
e.value.getBandwidth ])
peers.each do |peer|
id = peer.getPeerId
state = peer.isEnabled ? "ENABLED" : "DISABLED"
config = peer.getPeerConfig
namespaces = replication_admin.show_peer_namespaces(config)
tableCFs = replication_admin.show_peer_tableCFs(id)
formatter.row([ id, config.getClusterKey,
config.getReplicationEndpointImpl, state, namespaces, tableCFs,
config.getBandwidth ])
end
formatter.footer()

View File

@ -34,12 +34,19 @@ EOF
formatter.header([ "TABLE:COLUMNFAMILY", "ReplicationType" ], [ 32 ])
list = replication_admin.list_replicated_tables(regex)
list.each do |e|
if e.get(org.apache.hadoop.hbase.client.replication.ReplicationAdmin::REPLICATIONTYPE) == org.apache.hadoop.hbase.client.replication.ReplicationAdmin::REPLICATIONGLOBAL
replicateType = "GLOBAL"
else
replicateType = "unknown"
map = e.getColumnFamilyMap()
map.each do |cf|
if cf[1] == org.apache.hadoop.hbase.HConstants::REPLICATION_SCOPE_LOCAL
replicateType = "LOCAL"
elsif cf[1] == org.apache.hadoop.hbase.HConstants::REPLICATION_SCOPE_GLOBAL
replicateType = "GLOBAL"
elsif cf[1] == org.apache.hadoop.hbase.HConstants::REPLICATION_SCOPE_SERIAL
replicateType = "SERIAL"
else
replicateType = "UNKNOWN"
end
formatter.row([e.getTable().getNameAsString() + ":" + cf[0], replicateType], true, [32])
end
formatter.row([e.get(org.apache.hadoop.hbase.client.replication.ReplicationAdmin::TNAME) + ":" + e.get(org.apache.hadoop.hbase.client.replication.ReplicationAdmin::CFNAME), replicateType], true, [32])
end
formatter.footer()
end

View File

@ -73,8 +73,8 @@ module Hbase
command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key})
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
assert_equal(cluster_key, command(:list_peers).fetch(@peer_id).get_cluster_key)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
# cleanup for future tests
command(:remove_peer, @peer_id)
@ -86,8 +86,8 @@ module Hbase
command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key})
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
assert_equal(cluster_key, command(:list_peers).fetch(@peer_id).get_cluster_key)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
# cleanup for future tests
command(:remove_peer, @peer_id)
@ -100,8 +100,8 @@ module Hbase
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
assert_equal(cluster_key, command(:list_peers).fetch(@peer_id).get_cluster_key)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
# cleanup for future tests
command(:remove_peer, @peer_id)
@ -114,8 +114,8 @@ module Hbase
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
assert_equal(cluster_key, command(:list_peers).fetch(@peer_id).get_cluster_key)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
# cleanup for future tests
command(:remove_peer, @peer_id)
@ -130,8 +130,8 @@ module Hbase
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
peer_config = command(:list_peers).fetch(@peer_id)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(cluster_key, peer_config.get_cluster_key)
assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config))
@ -152,8 +152,8 @@ module Hbase
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
peer_config = command(:list_peers).fetch(@peer_id)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(cluster_key, peer_config.get_cluster_key)
assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config))
@ -186,8 +186,8 @@ module Hbase
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
assert_equal(cluster_key, command(:list_peers).fetch(@peer_id).get_cluster_key)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
assert_tablecfs_equal(table_cfs, command(:get_peer_config, @peer_id).getTableCFsMap())
# cleanup for future tests
@ -210,8 +210,8 @@ module Hbase
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
assert_equal(cluster_key, command(:list_peers).fetch(@peer_id).get_cluster_key)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
table_cfs = { "table1" => [], "table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] }
command(:set_peer_tableCFs, @peer_id, table_cfs)
@ -227,8 +227,8 @@ module Hbase
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
assert_equal(cluster_key, command(:list_peers).fetch(@peer_id).get_cluster_key)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] }
command(:append_peer_tableCFs, @peer_id, table_cfs)
@ -249,8 +249,8 @@ module Hbase
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
assert_equal(cluster_key, command(:list_peers).fetch(@peer_id).get_cluster_key)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
table_cfs = { "table1" => [], "ns2:table2" => ["cf1"] }
command(:remove_peer_tableCFs, @peer_id, { "ns3:table3" => ["cf1", "cf2"] })
@ -268,15 +268,11 @@ module Hbase
args = { CLUSTER_KEY => cluster_key }
command(:add_peer, @peer_id, args)
# Normally the ReplicationSourceManager will call ReplicationPeer#peer_added
# but here we have to do it ourselves
replication_admin.peer_added(@peer_id)
command(:set_peer_namespaces, @peer_id, namespaces)
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
peer_config = command(:list_peers).fetch(@peer_id)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config))
@ -292,15 +288,11 @@ module Hbase
args = { CLUSTER_KEY => cluster_key }
command(:add_peer, @peer_id, args)
# Normally the ReplicationSourceManager will call ReplicationPeer#peer_added
# but here we have to do it ourselves
replication_admin.peer_added(@peer_id)
command(:append_peer_namespaces, @peer_id, namespaces)
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
peer_config = command(:list_peers).fetch(@peer_id)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config))
@ -309,8 +301,8 @@ module Hbase
command(:append_peer_namespaces, @peer_id, namespaces)
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
peer_config = command(:list_peers).fetch(@peer_id)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config))
@ -318,8 +310,8 @@ module Hbase
command(:append_peer_namespaces, @peer_id, namespaces)
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
peer_config = command(:list_peers).fetch(@peer_id)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config))
@ -334,17 +326,13 @@ module Hbase
args = { CLUSTER_KEY => cluster_key, NAMESPACES => namespaces }
command(:add_peer, @peer_id, args)
# Normally the ReplicationSourceManager will call ReplicationPeer#peer_added
# but here we have to do it ourselves
replication_admin.peer_added(@peer_id)
namespaces = ["ns1", "ns2"]
namespaces_str = "ns3"
command(:remove_peer_namespaces, @peer_id, namespaces)
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
peer_config = command(:list_peers).fetch(@peer_id)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config))
@ -353,8 +341,8 @@ module Hbase
command(:remove_peer_namespaces, @peer_id, namespaces)
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
peer_config = command(:list_peers).fetch(@peer_id)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config))
@ -362,8 +350,8 @@ module Hbase
command(:remove_peer_namespaces, @peer_id, namespaces)
assert_equal(1, command(:list_peers).length)
assert(command(:list_peers).key?(@peer_id))
peer_config = command(:list_peers).fetch(@peer_id)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
peer_config = command(:list_peers).get(0).getPeerConfig
assert_equal(namespaces_str,
replication_admin.show_peer_namespaces(peer_config))
@ -375,9 +363,6 @@ module Hbase
cluster_key = "localhost:2181:/hbase-test"
args = { CLUSTER_KEY => cluster_key }
command(:add_peer, @peer_id, args)
# Normally the ReplicationSourceManager will call ReplicationPeer#peer_added
# but here we have to do it ourselves
replication_admin.peer_added(@peer_id)
peer_config = command(:get_peer_config, @peer_id)
assert_equal(0, peer_config.get_bandwidth)
@ -442,9 +427,6 @@ module Hbase
args = { ENDPOINT_CLASSNAME => repl_impl, CONFIG => config_params, DATA => data_params}
command(:add_peer, @peer_id, args)
#Normally the ReplicationSourceManager will call ReplicationPeer#peer_added, but here we have to do it ourselves
replication_admin.peer_added(@peer_id)
new_config_params = { "config1" => "new_value1" }
new_data_params = {"data1" => "new_value1"}
new_args = {CONFIG => new_config_params, DATA => new_data_params}