Revert "HBASE-24743 Reject to add a peer which replicate to itself earlier (#2071)"
This reverts commit 6cf013ddc4
.
TestReplicationAdmin and TestReplicationShell are broken on branch-2 and master respectively
This commit is contained in:
parent
3d270ba6f9
commit
32796ad24e
|
@ -822,7 +822,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
}
|
}
|
||||||
this.rsGroupInfoManager = RSGroupInfoManager.create(this);
|
this.rsGroupInfoManager = RSGroupInfoManager.create(this);
|
||||||
|
|
||||||
this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf, clusterId);
|
this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf);
|
||||||
|
|
||||||
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
|
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
|
||||||
this.drainingServerTracker.start();
|
this.drainingServerTracker.start();
|
||||||
|
|
|
@ -35,7 +35,6 @@ import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
|
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
|
||||||
|
@ -51,11 +50,9 @@ import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||||
import org.apache.hadoop.hbase.replication.SyncReplicationState;
|
import org.apache.hadoop.hbase.replication.SyncReplicationState;
|
||||||
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
|
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.zookeeper.KeeperException;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||||
|
@ -84,17 +81,11 @@ public class ReplicationPeerManager {
|
||||||
// Only allow to add one sync replication peer concurrently
|
// Only allow to add one sync replication peer concurrently
|
||||||
private final Semaphore syncReplicationPeerLock = new Semaphore(1);
|
private final Semaphore syncReplicationPeerLock = new Semaphore(1);
|
||||||
|
|
||||||
private final String clusterId;
|
|
||||||
|
|
||||||
private final Configuration conf;
|
|
||||||
|
|
||||||
ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
|
ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
|
||||||
ConcurrentMap<String, ReplicationPeerDescription> peers, Configuration conf, String clusterId) {
|
ConcurrentMap<String, ReplicationPeerDescription> peers) {
|
||||||
this.peerStorage = peerStorage;
|
this.peerStorage = peerStorage;
|
||||||
this.queueStorage = queueStorage;
|
this.queueStorage = queueStorage;
|
||||||
this.peers = peers;
|
this.peers = peers;
|
||||||
this.conf = conf;
|
|
||||||
this.clusterId = clusterId;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkQueuesDeleted(String peerId)
|
private void checkQueuesDeleted(String peerId)
|
||||||
|
@ -346,10 +337,11 @@ public class ReplicationPeerManager {
|
||||||
|
|
||||||
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
|
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
|
||||||
String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
|
String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
|
||||||
ReplicationEndpoint endpoint = null;
|
boolean checkClusterKey = true;
|
||||||
if (!StringUtils.isBlank(replicationEndpointImpl)) {
|
if (!StringUtils.isBlank(replicationEndpointImpl)) {
|
||||||
|
// try creating a instance
|
||||||
|
ReplicationEndpoint endpoint;
|
||||||
try {
|
try {
|
||||||
// try creating a instance
|
|
||||||
endpoint = Class.forName(replicationEndpointImpl)
|
endpoint = Class.forName(replicationEndpointImpl)
|
||||||
.asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
|
.asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
|
@ -357,15 +349,14 @@ public class ReplicationPeerManager {
|
||||||
"Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
|
"Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
|
||||||
e);
|
e);
|
||||||
}
|
}
|
||||||
|
// do not check cluster key if we are not HBaseInterClusterReplicationEndpoint
|
||||||
|
if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) {
|
||||||
|
checkClusterKey = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Default is HBaseInterClusterReplicationEndpoint and only it need to check cluster key
|
if (checkClusterKey) {
|
||||||
if (endpoint == null || endpoint instanceof HBaseInterClusterReplicationEndpoint) {
|
|
||||||
checkClusterKey(peerConfig.getClusterKey());
|
checkClusterKey(peerConfig.getClusterKey());
|
||||||
}
|
}
|
||||||
// Default is HBaseInterClusterReplicationEndpoint which cannot replicate to same cluster
|
|
||||||
if (endpoint == null || !endpoint.canReplicateToSameCluster()) {
|
|
||||||
checkClusterId(peerConfig.getClusterKey());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (peerConfig.replicateAllUserTables()) {
|
if (peerConfig.replicateAllUserTables()) {
|
||||||
// If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
|
// If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
|
||||||
|
@ -510,25 +501,6 @@ public class ReplicationPeerManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkClusterId(String clusterKey) throws DoNotRetryIOException {
|
|
||||||
String peerClusterId = "";
|
|
||||||
try {
|
|
||||||
// Create the peer cluster config for get peer cluster id
|
|
||||||
Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey);
|
|
||||||
try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) {
|
|
||||||
peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher);
|
|
||||||
}
|
|
||||||
} catch (IOException | KeeperException e) {
|
|
||||||
throw new DoNotRetryIOException("Can't get peerClusterId for clusterKey=" + clusterKey, e);
|
|
||||||
}
|
|
||||||
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
|
|
||||||
// peerClusterId value, which is the same as the source clusterId
|
|
||||||
if (clusterId.equals(peerClusterId)) {
|
|
||||||
throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey
|
|
||||||
+ ", should not replicate to itself for HBaseInterClusterReplicationEndpoint");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
|
public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
|
||||||
return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
|
return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
|
||||||
.filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
|
.filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
|
||||||
|
@ -539,7 +511,7 @@ public class ReplicationPeerManager {
|
||||||
return queueStorage;
|
return queueStorage;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, String clusterId)
|
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
|
||||||
throws ReplicationException {
|
throws ReplicationException {
|
||||||
ReplicationPeerStorage peerStorage =
|
ReplicationPeerStorage peerStorage =
|
||||||
ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
|
ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
|
||||||
|
@ -551,7 +523,7 @@ public class ReplicationPeerManager {
|
||||||
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state));
|
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig, state));
|
||||||
}
|
}
|
||||||
return new ReplicationPeerManager(peerStorage,
|
return new ReplicationPeerManager(peerStorage,
|
||||||
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId);
|
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -523,6 +523,16 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
||||||
if(!this.isSourceActive()) {
|
if(!this.isSourceActive()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
|
||||||
|
// peerClusterId value, which is the same as the source clusterId
|
||||||
|
if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
|
||||||
|
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
|
||||||
|
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
|
||||||
|
+ replicationEndpoint.getClass().getName(), null, false);
|
||||||
|
this.manager.removeSource(this);
|
||||||
|
return;
|
||||||
|
}
|
||||||
LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};",
|
LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};",
|
||||||
logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
|
logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
|
||||||
|
|
||||||
|
|
|
@ -71,9 +71,9 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
|
||||||
HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
|
HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
|
||||||
|
|
||||||
private final String ID_ONE = "1";
|
private final String ID_ONE = "1";
|
||||||
private static String KEY_ONE;
|
private final String KEY_ONE = "127.0.0.1:2181:/hbase";
|
||||||
private final String ID_TWO = "2";
|
private final String ID_TWO = "2";
|
||||||
private static String KEY_TWO;
|
private final String KEY_TWO = "127.0.0.1:2181:/hbase2";
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
@ -82,8 +82,6 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
|
||||||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
||||||
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
|
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
|
||||||
TEST_UTIL.startMiniCluster();
|
TEST_UTIL.startMiniCluster();
|
||||||
KEY_ONE = TEST_UTIL.getClusterKey() + "-test1";
|
|
||||||
KEY_TWO = TEST_UTIL.getClusterKey() + "-test2";
|
|
||||||
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
|
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -81,7 +81,7 @@ public class TestReplicationAdminForSyncReplication {
|
||||||
Thread[] threads = new Thread[5];
|
Thread[] threads = new Thread[5];
|
||||||
for (int i = 0; i < 5; i++) {
|
for (int i = 0; i < 5; i++) {
|
||||||
String peerId = "id" + i;
|
String peerId = "id" + i;
|
||||||
String clusterKey = TEST_UTIL.getClusterKey() + "-test" + i;
|
String clusterKey = "127.0.0.1:2181:/hbase" + i;
|
||||||
int index = i;
|
int index = i;
|
||||||
threads[i] = new Thread(() -> {
|
threads[i] = new Thread(() -> {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -113,11 +113,6 @@ public class SerialReplicationTestBase {
|
||||||
protected void doStop() {
|
protected void doStop() {
|
||||||
notifyStopped();
|
notifyStopped();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean canReplicateToSameCluster() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
|
|
|
@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
@ -33,14 +34,17 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||||
|
import org.apache.hadoop.hbase.ServerMetrics;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.Waiter;
|
||||||
import org.apache.hadoop.hbase.client.Admin;
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
@ -68,7 +72,9 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.HFileTestUtil;
|
import org.apache.hadoop.hbase.util.HFileTestUtil;
|
||||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||||
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
|
@ -170,16 +176,40 @@ public class TestMasterReplication {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests the replication scenario 0 -> 0. By default
|
* Tests the replication scenario 0 -> 0. By default
|
||||||
* {@link org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint},
|
* {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the
|
||||||
* the replication peer should not be added.
|
* ReplicationSource should terminate, and no further logs should get enqueued
|
||||||
*/
|
*/
|
||||||
@Test(expected = DoNotRetryIOException.class)
|
@Test
|
||||||
public void testLoopedReplication()
|
public void testLoopedReplication() throws Exception {
|
||||||
throws Exception {
|
|
||||||
LOG.info("testLoopedReplication");
|
LOG.info("testLoopedReplication");
|
||||||
startMiniClusters(1);
|
startMiniClusters(1);
|
||||||
createTableOnClusters(table);
|
createTableOnClusters(table);
|
||||||
addPeer("1", 0, 0);
|
addPeer("1", 0, 0);
|
||||||
|
Thread.sleep(SLEEP_TIME);
|
||||||
|
|
||||||
|
// wait for source to terminate
|
||||||
|
final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
|
||||||
|
Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
|
||||||
|
@Override
|
||||||
|
public boolean evaluate() throws Exception {
|
||||||
|
ClusterMetrics clusterStatus = utilities[0].getAdmin()
|
||||||
|
.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.LIVE_SERVERS));
|
||||||
|
ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(rsName);
|
||||||
|
List<ReplicationLoadSource> replicationLoadSourceList =
|
||||||
|
serverLoad.getReplicationLoadSourceList();
|
||||||
|
return replicationLoadSourceList.isEmpty();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Table[] htables = getHTablesOnClusters(tableName);
|
||||||
|
putAndWait(row, famName, htables[0], htables[0]);
|
||||||
|
rollWALAndWait(utilities[0], table.getTableName(), row);
|
||||||
|
ZKWatcher zkw = utilities[0].getZooKeeperWatcher();
|
||||||
|
String queuesZnode = ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode,
|
||||||
|
ZNodePaths.joinZNode("replication", "rs"));
|
||||||
|
List<String> listChildrenNoWatch =
|
||||||
|
ZKUtil.listChildrenNoWatch(zkw, ZNodePaths.joinZNode(queuesZnode, rsName.toString()));
|
||||||
|
assertEquals(0, listChildrenNoWatch.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -490,11 +490,6 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
||||||
stoppedCount.incrementAndGet();
|
stoppedCount.incrementAndGet();
|
||||||
notifyStopped();
|
notifyStopped();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean canReplicateToSameCluster() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class InterClusterReplicationEndpointForTest
|
public static class InterClusterReplicationEndpointForTest
|
||||||
|
|
|
@ -127,11 +127,6 @@ public class TestRaceWhenCreatingReplicationSource {
|
||||||
protected void doStop() {
|
protected void doStop() {
|
||||||
notifyStopped();
|
notifyStopped();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean canReplicateToSameCluster() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
|
|
|
@ -174,9 +174,8 @@ public class TestHBaseFsckCleanReplicationBarriers {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void createPeer() throws IOException {
|
public static void createPeer() throws IOException {
|
||||||
ReplicationPeerConfig rpc =
|
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
|
||||||
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test")
|
.setClusterKey(UTIL.getClusterKey()).setSerial(true).build();
|
||||||
.setSerial(true).build();
|
|
||||||
UTIL.getAdmin().addReplicationPeer(PEER_1, rpc);
|
UTIL.getAdmin().addReplicationPeer(PEER_1, rpc);
|
||||||
UTIL.getAdmin().addReplicationPeer(PEER_2, rpc);
|
UTIL.getAdmin().addReplicationPeer(PEER_2, rpc);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue