HBASE-24880 Remove ReplicationPeerConfigUpgrader
Closes #2266 Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Jan Hentschel <jan.hentschel@ultratendency.com> Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
c8c20160da
commit
ea26463a33
|
@ -187,7 +187,6 @@ import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||||
import org.apache.hadoop.hbase.replication.SyncReplicationState;
|
import org.apache.hadoop.hbase.replication.SyncReplicationState;
|
||||||
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
|
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
|
||||||
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
|
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
|
||||||
import org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader;
|
|
||||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
|
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
|
||||||
import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint;
|
import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint;
|
||||||
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
|
import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer;
|
||||||
|
@ -1025,13 +1024,6 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
zombieDetector.setDaemon(true);
|
zombieDetector.setDaemon(true);
|
||||||
zombieDetector.start();
|
zombieDetector.start();
|
||||||
|
|
||||||
// This is for backwards compatibility
|
|
||||||
// See HBASE-11393
|
|
||||||
status.setStatus("Update TableCFs node in ZNode");
|
|
||||||
ReplicationPeerConfigUpgrader tableCFsUpdater =
|
|
||||||
new ReplicationPeerConfigUpgrader(zooKeeper, conf);
|
|
||||||
tableCFsUpdater.copyTableCFs();
|
|
||||||
|
|
||||||
if (!maintenanceMode) {
|
if (!maintenanceMode) {
|
||||||
// Add the Observer to delete quotas on table deletion before starting all CPs by
|
// Add the Observer to delete quotas on table deletion before starting all CPs by
|
||||||
// default with quota support, avoiding if user specifically asks to not load this Observer.
|
// default with quota support, avoiding if user specifically asks to not load this Observer.
|
||||||
|
|
|
@ -1,175 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.replication.master;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage.PEERS_ZNODE;
|
|
||||||
import static org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage.PEERS_ZNODE_DEFAULT;
|
|
||||||
import static org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.REPLICATION_ZNODE;
|
|
||||||
import static org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
|
||||||
import org.apache.zookeeper.KeeperException;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x. It will
|
|
||||||
* be removed in HBase 3.x. See HBASE-11393
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
@InterfaceStability.Unstable
|
|
||||||
public class ReplicationPeerConfigUpgrader{
|
|
||||||
|
|
||||||
private static final String TABLE_CFS_ZNODE = "zookeeper.znode.replication.peers.tableCFs";
|
|
||||||
private static final String TABLE_CFS_ZNODE_DEFAULT = "tableCFs";
|
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUpgrader.class);
|
|
||||||
private final Configuration conf;
|
|
||||||
private final ZKWatcher zookeeper;
|
|
||||||
private final ReplicationPeerStorage peerStorage;
|
|
||||||
|
|
||||||
public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper, Configuration conf) {
|
|
||||||
this.zookeeper = zookeeper;
|
|
||||||
this.conf = conf;
|
|
||||||
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void upgrade() throws Exception {
|
|
||||||
try (Connection conn = ConnectionFactory.createConnection(conf)) {
|
|
||||||
Admin admin = conn.getAdmin();
|
|
||||||
admin.listReplicationPeers().forEach((peerDesc) -> {
|
|
||||||
String peerId = peerDesc.getPeerId();
|
|
||||||
ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig();
|
|
||||||
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
|
|
||||||
|| (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
|
|
||||||
peerConfig.setReplicateAllUserTables(false);
|
|
||||||
try {
|
|
||||||
admin.updateReplicationPeerConfig(peerId, peerConfig);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void copyTableCFs() throws ReplicationException {
|
|
||||||
for (String peerId : peerStorage.listPeerIds()) {
|
|
||||||
if (!copyTableCFs(peerId)) {
|
|
||||||
LOG.error("upgrade tableCFs failed for peerId=" + peerId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
protected String getTableCFsNode(String peerId) {
|
|
||||||
String replicationZNode = ZNodePaths.joinZNode(zookeeper.getZNodePaths().baseZNode,
|
|
||||||
conf.get(REPLICATION_ZNODE, REPLICATION_ZNODE_DEFAULT));
|
|
||||||
String peersZNode =
|
|
||||||
ZNodePaths.joinZNode(replicationZNode, conf.get(PEERS_ZNODE, PEERS_ZNODE_DEFAULT));
|
|
||||||
return ZNodePaths.joinZNode(peersZNode,
|
|
||||||
ZNodePaths.joinZNode(peerId, conf.get(TABLE_CFS_ZNODE, TABLE_CFS_ZNODE_DEFAULT)));
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean copyTableCFs(String peerId) throws ReplicationException {
|
|
||||||
String tableCFsNode = getTableCFsNode(peerId);
|
|
||||||
try {
|
|
||||||
if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) {
|
|
||||||
ReplicationPeerConfig rpc = peerStorage.getPeerConfig(peerId);
|
|
||||||
// We only need to copy data from tableCFs node to rpc Node the first time hmaster start.
|
|
||||||
if (rpc.getTableCFsMap() == null || rpc.getTableCFsMap().isEmpty()) {
|
|
||||||
// we copy TableCFs node into PeerNode
|
|
||||||
LOG.info("Copy table ColumnFamilies into peer=" + peerId);
|
|
||||||
ReplicationProtos.TableCF[] tableCFs =
|
|
||||||
ReplicationPeerConfigUtil.parseTableCFs(ZKUtil.getData(this.zookeeper, tableCFsNode));
|
|
||||||
if (tableCFs != null && tableCFs.length > 0) {
|
|
||||||
rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs));
|
|
||||||
peerStorage.updatePeerConfig(peerId, rpc);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
LOG.info("No tableCFs in peerNode:" + peerId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (KeeperException e) {
|
|
||||||
LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
|
|
||||||
return false;
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
|
|
||||||
return false;
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.warn("NOTICE!! Update peerId failed, peerId=" + peerId, e);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void printUsageAndExit() {
|
|
||||||
System.err.printf(
|
|
||||||
"Usage: hbase org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader"
|
|
||||||
+ " [options]");
|
|
||||||
System.err.println(" where [options] are:");
|
|
||||||
System.err.println(" -h|-help Show this help and exit.");
|
|
||||||
System.err.println(" copyTableCFs Copy table-cfs to replication peer config");
|
|
||||||
System.err.println(" upgrade Upgrade replication peer config to new format");
|
|
||||||
System.err.println();
|
|
||||||
System.exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
|
||||||
if (args.length != 1) {
|
|
||||||
printUsageAndExit();
|
|
||||||
}
|
|
||||||
if (args[0].equals("-help") || args[0].equals("-h")) {
|
|
||||||
printUsageAndExit();
|
|
||||||
} else if (args[0].equals("copyTableCFs")) {
|
|
||||||
Configuration conf = HBaseConfiguration.create();
|
|
||||||
try (ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null)) {
|
|
||||||
ReplicationPeerConfigUpgrader tableCFsUpdater =
|
|
||||||
new ReplicationPeerConfigUpgrader(zkw, conf);
|
|
||||||
tableCFsUpdater.copyTableCFs();
|
|
||||||
}
|
|
||||||
} else if (args[0].equals("upgrade")) {
|
|
||||||
Configuration conf = HBaseConfiguration.create();
|
|
||||||
try (ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null)) {
|
|
||||||
ReplicationPeerConfigUpgrader upgrader = new ReplicationPeerConfigUpgrader(zkw, conf);
|
|
||||||
upgrader.upgrade();
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
printUsageAndExit();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,221 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.replication.master;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
|
||||||
import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.ClassRule;
|
|
||||||
import org.junit.Rule;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
import org.junit.rules.TestName;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
@Category({ReplicationTests.class, SmallTests.class})
|
|
||||||
public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
|
|
||||||
|
|
||||||
@ClassRule
|
|
||||||
public static final HBaseClassTestRule CLASS_RULE =
|
|
||||||
HBaseClassTestRule.forClass(TestTableCFsUpdater.class);
|
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestTableCFsUpdater.class);
|
|
||||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
|
||||||
|
|
||||||
private static ZKWatcher zkw = null;
|
|
||||||
private static Abortable abortable = null;
|
|
||||||
private static ZKStorageUtil zkStorageUtil = null;
|
|
||||||
|
|
||||||
private static class ZKStorageUtil extends ZKReplicationPeerStorage {
|
|
||||||
public ZKStorageUtil(ZKWatcher zookeeper, Configuration conf) {
|
|
||||||
super(zookeeper, conf);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Rule
|
|
||||||
public TestName name = new TestName();
|
|
||||||
|
|
||||||
public TestTableCFsUpdater() {
|
|
||||||
super(zkw, TEST_UTIL.getConfiguration());
|
|
||||||
}
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void setUpBeforeClass() throws Exception {
|
|
||||||
TEST_UTIL.startMiniZKCluster();
|
|
||||||
Configuration conf = TEST_UTIL.getConfiguration();
|
|
||||||
abortable = new Abortable() {
|
|
||||||
@Override
|
|
||||||
public void abort(String why, Throwable e) {
|
|
||||||
LOG.info(why, e);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isAborted() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
zkw = new ZKWatcher(conf, "TableCFs", abortable, true);
|
|
||||||
zkStorageUtil = new ZKStorageUtil(zkw, conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void tearDownAfterClass() throws Exception {
|
|
||||||
TEST_UTIL.shutdownMiniZKCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testUpgrade() throws Exception {
|
|
||||||
String peerId = "1";
|
|
||||||
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
|
|
||||||
final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
|
|
||||||
final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3");
|
|
||||||
|
|
||||||
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
|
|
||||||
rpc.setClusterKey(zkw.getQuorum());
|
|
||||||
String peerNode = zkStorageUtil.getPeerNode(peerId);
|
|
||||||
ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
|
|
||||||
|
|
||||||
String tableCFs = tableName1 + ":cf1,cf2;" + tableName2 + ":cf3;" + tableName3;
|
|
||||||
String tableCFsNode = getTableCFsNode(peerId);
|
|
||||||
LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
|
|
||||||
ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs));
|
|
||||||
|
|
||||||
ReplicationPeerConfig actualRpc =
|
|
||||||
ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
|
||||||
String actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
|
|
||||||
|
|
||||||
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
|
||||||
assertNull(actualRpc.getTableCFsMap());
|
|
||||||
assertEquals(tableCFs, actualTableCfs);
|
|
||||||
|
|
||||||
peerId = "2";
|
|
||||||
rpc = new ReplicationPeerConfig();
|
|
||||||
rpc.setClusterKey(zkw.getQuorum());
|
|
||||||
peerNode = zkStorageUtil.getPeerNode(peerId);
|
|
||||||
ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
|
|
||||||
|
|
||||||
tableCFs = tableName1 + ":cf1,cf3;" + tableName2 + ":cf2";
|
|
||||||
tableCFsNode = getTableCFsNode(peerId);
|
|
||||||
LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
|
|
||||||
ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs));
|
|
||||||
|
|
||||||
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
|
||||||
actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
|
|
||||||
|
|
||||||
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
|
||||||
assertNull(actualRpc.getTableCFsMap());
|
|
||||||
assertEquals(tableCFs, actualTableCfs);
|
|
||||||
|
|
||||||
peerId = "3";
|
|
||||||
rpc = new ReplicationPeerConfig();
|
|
||||||
rpc.setClusterKey(zkw.getQuorum());
|
|
||||||
peerNode = zkStorageUtil.getPeerNode(peerId);
|
|
||||||
ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
|
|
||||||
|
|
||||||
tableCFs = "";
|
|
||||||
tableCFsNode = getTableCFsNode(peerId);
|
|
||||||
LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
|
|
||||||
ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs));
|
|
||||||
|
|
||||||
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
|
||||||
actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
|
|
||||||
|
|
||||||
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
|
||||||
assertNull(actualRpc.getTableCFsMap());
|
|
||||||
assertEquals(tableCFs, actualTableCfs);
|
|
||||||
|
|
||||||
peerId = "4";
|
|
||||||
rpc = new ReplicationPeerConfig();
|
|
||||||
rpc.setClusterKey(zkw.getQuorum());
|
|
||||||
peerNode = zkStorageUtil.getPeerNode(peerId);
|
|
||||||
ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
|
|
||||||
|
|
||||||
tableCFsNode = getTableCFsNode(peerId);
|
|
||||||
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
|
||||||
actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
|
|
||||||
|
|
||||||
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
|
||||||
assertNull(actualRpc.getTableCFsMap());
|
|
||||||
assertNull(actualTableCfs);
|
|
||||||
|
|
||||||
copyTableCFs();
|
|
||||||
|
|
||||||
peerId = "1";
|
|
||||||
peerNode = zkStorageUtil.getPeerNode(peerId);
|
|
||||||
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
|
||||||
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
|
||||||
Map<TableName, List<String>> tableNameListMap = actualRpc.getTableCFsMap();
|
|
||||||
assertEquals(3, tableNameListMap.size());
|
|
||||||
assertTrue(tableNameListMap.containsKey(tableName1));
|
|
||||||
assertTrue(tableNameListMap.containsKey(tableName2));
|
|
||||||
assertTrue(tableNameListMap.containsKey(tableName3));
|
|
||||||
assertEquals(2, tableNameListMap.get(tableName1).size());
|
|
||||||
assertEquals("cf1", tableNameListMap.get(tableName1).get(0));
|
|
||||||
assertEquals("cf2", tableNameListMap.get(tableName1).get(1));
|
|
||||||
assertEquals(1, tableNameListMap.get(tableName2).size());
|
|
||||||
assertEquals("cf3", tableNameListMap.get(tableName2).get(0));
|
|
||||||
assertNull(tableNameListMap.get(tableName3));
|
|
||||||
|
|
||||||
peerId = "2";
|
|
||||||
peerNode = zkStorageUtil.getPeerNode(peerId);
|
|
||||||
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
|
||||||
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
|
||||||
tableNameListMap = actualRpc.getTableCFsMap();
|
|
||||||
assertEquals(2, tableNameListMap.size());
|
|
||||||
assertTrue(tableNameListMap.containsKey(tableName1));
|
|
||||||
assertTrue(tableNameListMap.containsKey(tableName2));
|
|
||||||
assertEquals(2, tableNameListMap.get(tableName1).size());
|
|
||||||
assertEquals("cf1", tableNameListMap.get(tableName1).get(0));
|
|
||||||
assertEquals("cf3", tableNameListMap.get(tableName1).get(1));
|
|
||||||
assertEquals(1, tableNameListMap.get(tableName2).size());
|
|
||||||
assertEquals("cf2", tableNameListMap.get(tableName2).get(0));
|
|
||||||
|
|
||||||
peerId = "3";
|
|
||||||
peerNode = zkStorageUtil.getPeerNode(peerId);
|
|
||||||
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
|
||||||
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
|
||||||
tableNameListMap = actualRpc.getTableCFsMap();
|
|
||||||
assertNull(tableNameListMap);
|
|
||||||
|
|
||||||
peerId = "4";
|
|
||||||
peerNode = zkStorageUtil.getPeerNode(peerId);
|
|
||||||
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
|
||||||
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
|
||||||
tableNameListMap = actualRpc.getTableCFsMap();
|
|
||||||
assertNull(tableNameListMap);
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue