HBASE-24743 Reject to add a peer which replicate to itself earlier (#2122)

Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
Guanghao Zhang 2020-07-24 08:08:20 +08:00 committed by GitHub
parent e7963f6486
commit 32c7012ad7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 116 additions and 104 deletions

View File

@ -822,7 +822,7 @@ public class HMaster extends HRegionServer implements MasterServices {
} }
this.rsGroupInfoManager = RSGroupInfoManager.create(this); this.rsGroupInfoManager = RSGroupInfoManager.create(this);
this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf); this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf, clusterId);
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager); this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
this.drainingServerTracker.start(); this.drainingServerTracker.start();

View File

@ -35,6 +35,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
@ -50,9 +51,11 @@ import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint; import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
@ -81,11 +84,17 @@ public class ReplicationPeerManager {
// Only allow to add one sync replication peer concurrently // Only allow to add one sync replication peer concurrently
private final Semaphore syncReplicationPeerLock = new Semaphore(1); private final Semaphore syncReplicationPeerLock = new Semaphore(1);
private final String clusterId;
private final Configuration conf;
ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage, ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
ConcurrentMap<String, ReplicationPeerDescription> peers) { ConcurrentMap<String, ReplicationPeerDescription> peers, Configuration conf, String clusterId) {
this.peerStorage = peerStorage; this.peerStorage = peerStorage;
this.queueStorage = queueStorage; this.queueStorage = queueStorage;
this.peers = peers; this.peers = peers;
this.conf = conf;
this.clusterId = clusterId;
} }
private void checkQueuesDeleted(String peerId) private void checkQueuesDeleted(String peerId)
@ -337,11 +346,10 @@ public class ReplicationPeerManager {
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl(); String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
boolean checkClusterKey = true; ReplicationEndpoint endpoint = null;
if (!StringUtils.isBlank(replicationEndpointImpl)) { if (!StringUtils.isBlank(replicationEndpointImpl)) {
// try creating a instance
ReplicationEndpoint endpoint;
try { try {
// try creating a instance
endpoint = Class.forName(replicationEndpointImpl) endpoint = Class.forName(replicationEndpointImpl)
.asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance(); .asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
} catch (Throwable e) { } catch (Throwable e) {
@ -349,14 +357,15 @@ public class ReplicationPeerManager {
"Can not instantiate configured replication endpoint class=" + replicationEndpointImpl, "Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
e); e);
} }
// do not check cluster key if we are not HBaseInterClusterReplicationEndpoint
if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) {
checkClusterKey = false;
}
} }
if (checkClusterKey) { // Default is HBaseInterClusterReplicationEndpoint and only it need to check cluster key
if (endpoint == null || endpoint instanceof HBaseInterClusterReplicationEndpoint) {
checkClusterKey(peerConfig.getClusterKey()); checkClusterKey(peerConfig.getClusterKey());
} }
// Default is HBaseInterClusterReplicationEndpoint which cannot replicate to same cluster
if (endpoint == null || !endpoint.canReplicateToSameCluster()) {
checkClusterId(peerConfig.getClusterKey());
}
if (peerConfig.replicateAllUserTables()) { if (peerConfig.replicateAllUserTables()) {
// If replicate_all flag is true, it means all user tables will be replicated to peer cluster. // If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
@ -501,6 +510,25 @@ public class ReplicationPeerManager {
} }
} }
private void checkClusterId(String clusterKey) throws DoNotRetryIOException {
String peerClusterId = "";
try {
// Create the peer cluster config for get peer cluster id
Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey);
try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) {
peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher);
}
} catch (IOException | KeeperException e) {
throw new DoNotRetryIOException("Can't get peerClusterId for clusterKey=" + clusterKey, e);
}
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
// peerClusterId value, which is the same as the source clusterId
if (clusterId.equals(peerClusterId)) {
throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey
+ ", should not replicate to itself for HBaseInterClusterReplicationEndpoint");
}
}
public List<String> getSerialPeerIdsBelongsTo(TableName tableName) { public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
return peers.values().stream().filter(p -> p.getPeerConfig().isSerial()) return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
.filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId()) .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
@ -511,7 +539,7 @@ public class ReplicationPeerManager {
return queueStorage; return queueStorage;
} }
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf) public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, String clusterId)
throws ReplicationException { throws ReplicationException {
ReplicationPeerStorage peerStorage = ReplicationPeerStorage peerStorage =
ReplicationStorageFactory.getReplicationPeerStorage(zk, conf); ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
@ -523,7 +551,7 @@ public class ReplicationPeerManager {
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state)); peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state));
} }
return new ReplicationPeerManager(peerStorage, return new ReplicationPeerManager(peerStorage,
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers); ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId);
} }
/** /**

View File

@ -523,16 +523,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
if(!this.isSourceActive()) { if(!this.isSourceActive()) {
return; return;
} }
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
// peerClusterId value, which is the same as the source clusterId
if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false);
this.manager.removeSource(this);
return;
}
LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};", LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};",
logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId); logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);

View File

@ -71,9 +71,9 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class); HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
private final String ID_ONE = "1"; private final String ID_ONE = "1";
private final String KEY_ONE = "127.0.0.1:2181:/hbase"; private static String KEY_ONE;
private final String ID_TWO = "2"; private final String ID_TWO = "2";
private final String KEY_TWO = "127.0.0.1:2181:/hbase2"; private static String KEY_TWO;
@BeforeClass @BeforeClass
public static void setUpBeforeClass() throws Exception { public static void setUpBeforeClass() throws Exception {
@ -82,6 +82,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
TEST_UTIL.startMiniCluster(); TEST_UTIL.startMiniCluster();
KEY_ONE = TEST_UTIL.getClusterKey() + "-test1";
KEY_TWO = TEST_UTIL.getClusterKey() + "-test2";
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
} }

View File

@ -81,7 +81,7 @@ public class TestReplicationAdminForSyncReplication {
Thread[] threads = new Thread[5]; Thread[] threads = new Thread[5];
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
String peerId = "id" + i; String peerId = "id" + i;
String clusterKey = "127.0.0.1:2181:/hbase" + i; String clusterKey = TEST_UTIL.getClusterKey() + "-test" + i;
int index = i; int index = i;
threads[i] = new Thread(() -> { threads[i] = new Thread(() -> {
try { try {

View File

@ -113,6 +113,11 @@ public class SerialReplicationTestBase {
protected void doStop() { protected void doStop() {
notifyStopped(); notifyStopped();
} }
@Override
public boolean canReplicateToSameCluster() {
return true;
}
} }
@BeforeClass @BeforeClass

View File

@ -25,7 +25,6 @@ import static org.junit.Assert.fail;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Optional; import java.util.Optional;
import java.util.Random; import java.util.Random;
@ -34,17 +33,14 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
@ -72,9 +68,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileTestUtil; import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.ClassRule; import org.junit.ClassRule;
@ -176,40 +170,16 @@ public class TestMasterReplication {
/** /**
* Tests the replication scenario 0 -> 0. By default * Tests the replication scenario 0 -> 0. By default
* {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the * {@link org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint},
* ReplicationSource should terminate, and no further logs should get enqueued * the replication peer should not be added.
*/ */
@Test @Test(expected = DoNotRetryIOException.class)
public void testLoopedReplication() throws Exception { public void testLoopedReplication()
throws Exception {
LOG.info("testLoopedReplication"); LOG.info("testLoopedReplication");
startMiniClusters(1); startMiniClusters(1);
createTableOnClusters(table); createTableOnClusters(table);
addPeer("1", 0, 0); addPeer("1", 0, 0);
Thread.sleep(SLEEP_TIME);
// wait for source to terminate
final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
ClusterMetrics clusterStatus = utilities[0].getAdmin()
.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.LIVE_SERVERS));
ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(rsName);
List<ReplicationLoadSource> replicationLoadSourceList =
serverLoad.getReplicationLoadSourceList();
return replicationLoadSourceList.isEmpty();
}
});
Table[] htables = getHTablesOnClusters(tableName);
putAndWait(row, famName, htables[0], htables[0]);
rollWALAndWait(utilities[0], table.getTableName(), row);
ZKWatcher zkw = utilities[0].getZooKeeperWatcher();
String queuesZnode = ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode,
ZNodePaths.joinZNode("replication", "rs"));
List<String> listChildrenNoWatch =
ZKUtil.listChildrenNoWatch(zkw, ZNodePaths.joinZNode(queuesZnode, rsName.toString()));
assertEquals(0, listChildrenNoWatch.size());
} }
/** /**

View File

@ -490,6 +490,11 @@ public class TestReplicationEndpoint extends TestReplicationBase {
stoppedCount.incrementAndGet(); stoppedCount.incrementAndGet();
notifyStopped(); notifyStopped();
} }
@Override
public boolean canReplicateToSameCluster() {
return true;
}
} }
public static class InterClusterReplicationEndpointForTest public static class InterClusterReplicationEndpointForTest

View File

@ -127,6 +127,11 @@ public class TestRaceWhenCreatingReplicationSource {
protected void doStop() { protected void doStop() {
notifyStopped(); notifyStopped();
} }
@Override
public boolean canReplicateToSameCluster() {
return true;
}
} }
@BeforeClass @BeforeClass

View File

@ -174,8 +174,9 @@ public class TestHBaseFsckCleanReplicationBarriers {
} }
public static void createPeer() throws IOException { public static void createPeer() throws IOException {
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder() ReplicationPeerConfig rpc =
.setClusterKey(UTIL.getClusterKey()).setSerial(true).build(); ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test")
.setSerial(true).build();
UTIL.getAdmin().addReplicationPeer(PEER_1, rpc); UTIL.getAdmin().addReplicationPeer(PEER_1, rpc);
UTIL.getAdmin().addReplicationPeer(PEER_2, rpc); UTIL.getAdmin().addReplicationPeer(PEER_2, rpc);
} }

View File

@ -33,6 +33,7 @@ module Hbase
def setup def setup
@peer_id = '1' @peer_id = '1'
@dummy_endpoint = 'org.apache.hadoop.hbase.replication.DummyReplicationEndpoint'
setup_hbase setup_hbase
@ -73,7 +74,8 @@ module Hbase
define_test "add_peer: single zk cluster key" do define_test "add_peer: single zk cluster key" do
cluster_key = "server1.cie.com:2181:/hbase" cluster_key = "server1.cie.com:2181:/hbase"
command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key}) args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
peer = command(:list_peers).get(0) peer = command(:list_peers).get(0)
@ -88,7 +90,8 @@ module Hbase
define_test "add_peer: multiple zk cluster key" do define_test "add_peer: multiple zk cluster key" do
cluster_key = "zk1,zk2,zk3:2182:/hbase-prod" cluster_key = "zk1,zk2,zk3:2182:/hbase-prod"
command(:add_peer, @peer_id, {CLUSTER_KEY => cluster_key}) args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
peer = command(:list_peers).get(0) peer = command(:list_peers).get(0)
@ -106,8 +109,8 @@ module Hbase
table_cfs = { "ns3:table1" => [], "ns3:table2" => [], table_cfs = { "ns3:table1" => [], "ns3:table2" => [],
"ns3:table3" => [] } "ns3:table3" => [] }
# add a new replication peer which serial flag is true # add a new replication peer which serial flag is true
args = { CLUSTER_KEY => cluster_key, SERIAL => true, args = {CLUSTER_KEY => cluster_key, SERIAL => true,
TABLE_CFS => table_cfs} TABLE_CFS => table_cfs, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
@ -126,8 +129,8 @@ module Hbase
remote_wal_dir = "hdfs://srv1:9999/hbase" remote_wal_dir = "hdfs://srv1:9999/hbase"
table_cfs = { "ns3:table1" => [], "ns3:table2" => [], table_cfs = { "ns3:table1" => [], "ns3:table2" => [],
"ns3:table3" => [] } "ns3:table3" => [] }
args = { CLUSTER_KEY => cluster_key, REMOTE_WAL_DIR => remote_wal_dir, args = {CLUSTER_KEY => cluster_key, REMOTE_WAL_DIR => remote_wal_dir,
TABLE_CFS => table_cfs} TABLE_CFS => table_cfs, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
@ -144,7 +147,7 @@ module Hbase
define_test "add_peer: single zk cluster key with enabled/disabled state" do define_test "add_peer: single zk cluster key with enabled/disabled state" do
cluster_key = "server1.cie.com:2181:/hbase" cluster_key = "server1.cie.com:2181:/hbase"
args = { CLUSTER_KEY => cluster_key } args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
@ -153,7 +156,8 @@ module Hbase
command(:remove_peer, @peer_id) command(:remove_peer, @peer_id)
enable_args = { CLUSTER_KEY => cluster_key, STATE => 'ENABLED' } enable_args = {CLUSTER_KEY => cluster_key, STATE => 'ENABLED',
ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, enable_args) command(:add_peer, @peer_id, enable_args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
@ -162,7 +166,8 @@ module Hbase
command(:remove_peer, @peer_id) command(:remove_peer, @peer_id)
disable_args = { CLUSTER_KEY => cluster_key, STATE => 'DISABLED' } disable_args = {CLUSTER_KEY => cluster_key, STATE => 'DISABLED',
ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, disable_args) command(:add_peer, @peer_id, disable_args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
@ -175,7 +180,7 @@ module Hbase
define_test "add_peer: multiple zk cluster key - peer config" do define_test "add_peer: multiple zk cluster key - peer config" do
cluster_key = "zk1,zk2,zk3:2182:/hbase-prod" cluster_key = "zk1,zk2,zk3:2182:/hbase-prod"
args = { CLUSTER_KEY => cluster_key } args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
@ -193,7 +198,8 @@ module Hbase
namespaces = ["ns1", "ns2", "ns3"] namespaces = ["ns1", "ns2", "ns3"]
namespaces_str = "ns1;ns2;ns3" namespaces_str = "ns1;ns2;ns3"
args = { CLUSTER_KEY => cluster_key, NAMESPACES => namespaces } args = {CLUSTER_KEY => cluster_key, NAMESPACES => namespaces,
ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
@ -216,8 +222,8 @@ module Hbase
"ns3:table3" => ["cf1", "cf2"] } "ns3:table3" => ["cf1", "cf2"] }
namespaces_str = "ns1;ns2" namespaces_str = "ns1;ns2"
args = { CLUSTER_KEY => cluster_key, NAMESPACES => namespaces, args = {CLUSTER_KEY => cluster_key, NAMESPACES => namespaces,
TABLE_CFS => table_cfs } TABLE_CFS => table_cfs, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
@ -253,7 +259,8 @@ module Hbase
cluster_key = "zk4,zk5,zk6:11000:/hbase-test" cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
table_cfs = { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] } table_cfs = { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
args = { CLUSTER_KEY => cluster_key, TABLE_CFS => table_cfs } args = {CLUSTER_KEY => cluster_key, TABLE_CFS => table_cfs,
ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
@ -279,7 +286,7 @@ module Hbase
define_test "set_peer_tableCFs: works with table-cfs map" do define_test "set_peer_tableCFs: works with table-cfs map" do
cluster_key = "zk4,zk5,zk6:11000:/hbase-test" cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
args = { CLUSTER_KEY => cluster_key} args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
command(:set_peer_replicate_all, @peer_id, false) command(:set_peer_replicate_all, @peer_id, false)
@ -298,7 +305,7 @@ module Hbase
define_test "append_peer_tableCFs: works with table-cfs map" do define_test "append_peer_tableCFs: works with table-cfs map" do
cluster_key = "zk4,zk5,zk6:11000:/hbase-test" cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
args = { CLUSTER_KEY => cluster_key} args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
command(:set_peer_replicate_all, @peer_id, false) command(:set_peer_replicate_all, @peer_id, false)
@ -322,7 +329,8 @@ module Hbase
define_test "remove_peer_tableCFs: works with table-cfs map" do define_test "remove_peer_tableCFs: works with table-cfs map" do
cluster_key = "zk4,zk5,zk6:11000:/hbase-test" cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
table_cfs = { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] } table_cfs = { "table1" => [], "ns2:table2" => ["cf1"], "ns3:table3" => ["cf1", "cf2"] }
args = { CLUSTER_KEY => cluster_key, TABLE_CFS => table_cfs } args = {CLUSTER_KEY => cluster_key, TABLE_CFS => table_cfs,
ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
@ -340,7 +348,7 @@ module Hbase
define_test 'set_peer_exclude_tableCFs: works with table-cfs map' do define_test 'set_peer_exclude_tableCFs: works with table-cfs map' do
cluster_key = 'zk4,zk5,zk6:11000:/hbase-test' cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'
args = { CLUSTER_KEY => cluster_key } args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
@ -363,7 +371,7 @@ module Hbase
define_test "append_peer_exclude_tableCFs: works with exclude table-cfs map" do define_test "append_peer_exclude_tableCFs: works with exclude table-cfs map" do
cluster_key = "zk4,zk5,zk6:11000:/hbase-test" cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
args = {CLUSTER_KEY => cluster_key} args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
peer = command(:list_peers).get(0) peer = command(:list_peers).get(0)
@ -398,7 +406,7 @@ module Hbase
define_test 'remove_peer_exclude_tableCFs: works with exclude table-cfs map' do define_test 'remove_peer_exclude_tableCFs: works with exclude table-cfs map' do
cluster_key = 'zk4,zk5,zk6:11000:/hbase-test' cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'
args = {CLUSTER_KEY => cluster_key} args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
peer = command(:list_peers).get(0) peer = command(:list_peers).get(0)
@ -436,7 +444,7 @@ module Hbase
namespaces = ["ns1", "ns2"] namespaces = ["ns1", "ns2"]
namespaces_str = "ns1;ns2" namespaces_str = "ns1;ns2"
args = { CLUSTER_KEY => cluster_key } args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
command(:set_peer_replicate_all, @peer_id, false) command(:set_peer_replicate_all, @peer_id, false)
@ -457,7 +465,7 @@ module Hbase
namespaces = ["ns1", "ns2"] namespaces = ["ns1", "ns2"]
namespaces_str = "ns1;ns2" namespaces_str = "ns1;ns2"
args = { CLUSTER_KEY => cluster_key } args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
command(:set_peer_replicate_all, @peer_id, false) command(:set_peer_replicate_all, @peer_id, false)
@ -496,7 +504,8 @@ module Hbase
cluster_key = "zk4,zk5,zk6:11000:/hbase-test" cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
namespaces = ["ns1", "ns2", "ns3"] namespaces = ["ns1", "ns2", "ns3"]
args = { CLUSTER_KEY => cluster_key, NAMESPACES => namespaces } args = {CLUSTER_KEY => cluster_key, NAMESPACES => namespaces,
ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
namespaces = ["ns1", "ns2"] namespaces = ["ns1", "ns2"]
@ -537,7 +546,7 @@ module Hbase
namespaces = ['ns1', 'ns2'] namespaces = ['ns1', 'ns2']
namespaces_str = '!ns1;ns2' namespaces_str = '!ns1;ns2'
args = { CLUSTER_KEY => cluster_key } args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
command(:set_peer_exclude_namespaces, @peer_id, namespaces) command(:set_peer_exclude_namespaces, @peer_id, namespaces)
@ -554,7 +563,7 @@ module Hbase
define_test 'set_peer_replicate_all' do define_test 'set_peer_replicate_all' do
cluster_key = 'zk4,zk5,zk6:11000:/hbase-test' cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'
args = { CLUSTER_KEY => cluster_key } args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
@ -576,7 +585,7 @@ module Hbase
define_test 'set_peer_serial' do define_test 'set_peer_serial' do
cluster_key = 'zk4,zk5,zk6:11000:/hbase-test' cluster_key = 'zk4,zk5,zk6:11000:/hbase-test'
args = { CLUSTER_KEY => cluster_key } args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
@ -599,7 +608,7 @@ module Hbase
define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do define_test "set_peer_bandwidth: works with peer bandwidth upper limit" do
cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test" cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test"
args = { CLUSTER_KEY => cluster_key } args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
peer_config = command(:get_peer_config, @peer_id) peer_config = command(:get_peer_config, @peer_id)
@ -617,8 +626,8 @@ module Hbase
remote_wal_dir = "hdfs://srv1:9999/hbase" remote_wal_dir = "hdfs://srv1:9999/hbase"
table_cfs = { "ns3:table1" => [], "ns3:table2" => [], table_cfs = { "ns3:table1" => [], "ns3:table2" => [],
"ns3:table3" => [] } "ns3:table3" => [] }
args = { CLUSTER_KEY => cluster_key, REMOTE_WAL_DIR => remote_wal_dir, args = {CLUSTER_KEY => cluster_key, REMOTE_WAL_DIR => remote_wal_dir,
TABLE_CFS => table_cfs} TABLE_CFS => table_cfs, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
assert_equal(1, command(:list_peers).length) assert_equal(1, command(:list_peers).length)
@ -645,7 +654,7 @@ module Hbase
define_test "get_peer_config: works with simple clusterKey peer" do define_test "get_peer_config: works with simple clusterKey peer" do
cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test" cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test"
args = { CLUSTER_KEY => cluster_key } args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
peer_config = command(:get_peer_config, @peer_id) peer_config = command(:get_peer_config, @peer_id)
assert_equal(cluster_key, peer_config.get_cluster_key) assert_equal(cluster_key, peer_config.get_cluster_key)
@ -655,14 +664,13 @@ module Hbase
define_test "get_peer_config: works with replicationendpointimpl peer and config params" do define_test "get_peer_config: works with replicationendpointimpl peer and config params" do
cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test" cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test"
repl_impl = 'org.apache.hadoop.hbase.replication.DummyReplicationEndpoint'
config_params = { "config1" => "value1", "config2" => "value2" } config_params = { "config1" => "value1", "config2" => "value2" }
args = { CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => repl_impl, args = { CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint,
CONFIG => config_params } CONFIG => config_params }
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
peer_config = command(:get_peer_config, @peer_id) peer_config = command(:get_peer_config, @peer_id)
assert_equal(cluster_key, peer_config.get_cluster_key) assert_equal(cluster_key, peer_config.get_cluster_key)
assert_equal(repl_impl, peer_config.get_replication_endpoint_impl) assert_equal(@dummy_endpoint, peer_config.get_replication_endpoint_impl)
assert_equal(2, peer_config.get_configuration.size) assert_equal(2, peer_config.get_configuration.size)
assert_equal("value1", peer_config.get_configuration.get("config1")) assert_equal("value1", peer_config.get_configuration.get("config1"))
#cleanup #cleanup
@ -671,29 +679,27 @@ module Hbase
define_test "list_peer_configs: returns all peers' ReplicationPeerConfig objects" do define_test "list_peer_configs: returns all peers' ReplicationPeerConfig objects" do
cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test" cluster_key = org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster::HOST + ":2181:/hbase-test"
args = { CLUSTER_KEY => cluster_key } args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
peer_id_second = '2' peer_id_second = '2'
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
repl_impl = "org.apache.hadoop.hbase.replication.DummyReplicationEndpoint"
config_params = { "config1" => "value1", "config2" => "value2" } config_params = { "config1" => "value1", "config2" => "value2" }
args2 = { ENDPOINT_CLASSNAME => repl_impl, CONFIG => config_params} args2 = {ENDPOINT_CLASSNAME => @dummy_endpoint, CONFIG => config_params}
command(:add_peer, peer_id_second, args2) command(:add_peer, peer_id_second, args2)
peer_configs = command(:list_peer_configs) peer_configs = command(:list_peer_configs)
assert_equal(2, peer_configs.size) assert_equal(2, peer_configs.size)
assert_equal(cluster_key, peer_configs.get(@peer_id).get_cluster_key) assert_equal(cluster_key, peer_configs.get(@peer_id).get_cluster_key)
assert_equal(repl_impl, peer_configs.get(peer_id_second).get_replication_endpoint_impl) assert_equal(@dummy_endpoint, peer_configs.get(peer_id_second).get_replication_endpoint_impl)
#cleanup #cleanup
command(:remove_peer, @peer_id) command(:remove_peer, @peer_id)
command(:remove_peer, peer_id_second) command(:remove_peer, peer_id_second)
end end
define_test "update_peer_config: can update peer config and data" do define_test "update_peer_config: can update peer config and data" do
repl_impl = "org.apache.hadoop.hbase.replication.DummyReplicationEndpoint"
config_params = { "config1" => "value1", "config2" => "value2" } config_params = { "config1" => "value1", "config2" => "value2" }
data_params = {"data1" => "value1", "data2" => "value2"} data_params = {"data1" => "value1", "data2" => "value2"}
args = { ENDPOINT_CLASSNAME => repl_impl, CONFIG => config_params, DATA => data_params} args = {ENDPOINT_CLASSNAME => @dummy_endpoint, CONFIG => config_params, DATA => data_params}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
new_config_params = { "config1" => "new_value1" } new_config_params = { "config1" => "new_value1" }
@ -713,7 +719,7 @@ module Hbase
define_test "append_peer_exclude_namespaces: works with namespaces array" do define_test "append_peer_exclude_namespaces: works with namespaces array" do
cluster_key = "zk4,zk5,zk6:11000:/hbase-test" cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
args = {CLUSTER_KEY => cluster_key} args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
command(:set_peer_replicate_all, @peer_id, true) command(:set_peer_replicate_all, @peer_id, true)
@ -749,7 +755,7 @@ module Hbase
define_test "remove_peer_exclude_namespaces: works with namespaces array" do define_test "remove_peer_exclude_namespaces: works with namespaces array" do
cluster_key = "zk4,zk5,zk6:11000:/hbase-test" cluster_key = "zk4,zk5,zk6:11000:/hbase-test"
args = {CLUSTER_KEY => cluster_key} args = {CLUSTER_KEY => cluster_key, ENDPOINT_CLASSNAME => @dummy_endpoint}
command(:add_peer, @peer_id, args) command(:add_peer, @peer_id, args)
namespaces = ["ns1", "ns2", "ns3"] namespaces = ["ns1", "ns2", "ns3"]