HBASE-24743 Reject to add a peer which replicate to itself earlier (#2071)
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org> Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
parent
89cf76c2cd
commit
5db3ec2cfb
|
@ -801,7 +801,7 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this);
|
||||
this.splitOrMergeTracker.start();
|
||||
|
||||
this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf);
|
||||
this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf, clusterId);
|
||||
|
||||
this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager);
|
||||
this.drainingServerTracker.start();
|
||||
|
|
|
@ -31,6 +31,7 @@ import java.util.stream.Collectors;
|
|||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
|
||||
|
@ -45,9 +46,11 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
/**
|
||||
* Manages and performs all replication admin operations.
|
||||
|
@ -63,11 +66,17 @@ public class ReplicationPeerManager {
|
|||
|
||||
private final ConcurrentMap<String, ReplicationPeerDescription> peers;
|
||||
|
||||
private final String clusterId;
|
||||
|
||||
private final Configuration conf;
|
||||
|
||||
ReplicationPeerManager(ReplicationPeerStorage peerStorage, ReplicationQueueStorage queueStorage,
|
||||
ConcurrentMap<String, ReplicationPeerDescription> peers) {
|
||||
ConcurrentMap<String, ReplicationPeerDescription> peers, Configuration conf, String clusterId) {
|
||||
this.peerStorage = peerStorage;
|
||||
this.queueStorage = queueStorage;
|
||||
this.peers = peers;
|
||||
this.conf = conf;
|
||||
this.clusterId = clusterId;
|
||||
}
|
||||
|
||||
private void checkQueuesDeleted(String peerId)
|
||||
|
@ -245,11 +254,10 @@ public class ReplicationPeerManager {
|
|||
|
||||
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
|
||||
String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
|
||||
boolean checkClusterKey = true;
|
||||
ReplicationEndpoint endpoint = null;
|
||||
if (!StringUtils.isBlank(replicationEndpointImpl)) {
|
||||
// try creating a instance
|
||||
ReplicationEndpoint endpoint;
|
||||
try {
|
||||
// try creating a instance
|
||||
endpoint = Class.forName(replicationEndpointImpl)
|
||||
.asSubclass(ReplicationEndpoint.class).getDeclaredConstructor().newInstance();
|
||||
} catch (Throwable e) {
|
||||
|
@ -257,14 +265,15 @@ public class ReplicationPeerManager {
|
|||
"Can not instantiate configured replication endpoint class=" + replicationEndpointImpl,
|
||||
e);
|
||||
}
|
||||
// do not check cluster key if we are not HBaseInterClusterReplicationEndpoint
|
||||
if (!(endpoint instanceof HBaseInterClusterReplicationEndpoint)) {
|
||||
checkClusterKey = false;
|
||||
}
|
||||
}
|
||||
if (checkClusterKey) {
|
||||
// Default is HBaseInterClusterReplicationEndpoint and only it need to check cluster key
|
||||
if (endpoint == null || endpoint instanceof HBaseInterClusterReplicationEndpoint) {
|
||||
checkClusterKey(peerConfig.getClusterKey());
|
||||
}
|
||||
// Default is HBaseInterClusterReplicationEndpoint which cannot replicate to same cluster
|
||||
if (endpoint == null || !endpoint.canReplicateToSameCluster()) {
|
||||
checkClusterId(peerConfig.getClusterKey());
|
||||
}
|
||||
|
||||
if (peerConfig.replicateAllUserTables()) {
|
||||
// If replicate_all flag is true, it means all user tables will be replicated to peer cluster.
|
||||
|
@ -357,6 +366,25 @@ public class ReplicationPeerManager {
|
|||
}
|
||||
}
|
||||
|
||||
private void checkClusterId(String clusterKey) throws DoNotRetryIOException {
|
||||
String peerClusterId = "";
|
||||
try {
|
||||
// Create the peer cluster config for get peer cluster id
|
||||
Configuration peerConf = HBaseConfiguration.createClusterConf(conf, clusterKey);
|
||||
try (ZKWatcher zkWatcher = new ZKWatcher(peerConf, this + "check-peer-cluster-id", null)) {
|
||||
peerClusterId = ZKClusterId.readClusterIdZNode(zkWatcher);
|
||||
}
|
||||
} catch (IOException | KeeperException e) {
|
||||
throw new DoNotRetryIOException("Can't get peerClusterId for clusterKey=" + clusterKey, e);
|
||||
}
|
||||
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
|
||||
// peerClusterId value, which is the same as the source clusterId
|
||||
if (clusterId.equals(peerClusterId)) {
|
||||
throw new DoNotRetryIOException("Invalid cluster key: " + clusterKey
|
||||
+ ", should not replicate to itself for HBaseInterClusterReplicationEndpoint");
|
||||
}
|
||||
}
|
||||
|
||||
public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
|
||||
return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
|
||||
.filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
|
||||
|
@ -367,7 +395,7 @@ public class ReplicationPeerManager {
|
|||
return queueStorage;
|
||||
}
|
||||
|
||||
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf)
|
||||
public static ReplicationPeerManager create(ZKWatcher zk, Configuration conf, String clusterId)
|
||||
throws ReplicationException {
|
||||
ReplicationPeerStorage peerStorage =
|
||||
ReplicationStorageFactory.getReplicationPeerStorage(zk, conf);
|
||||
|
@ -378,7 +406,7 @@ public class ReplicationPeerManager {
|
|||
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, peerConfig));
|
||||
}
|
||||
return new ReplicationPeerManager(peerStorage,
|
||||
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers);
|
||||
ReplicationStorageFactory.getReplicationQueueStorage(zk, conf), peers, conf, clusterId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -524,16 +524,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
|
|||
if (!this.isSourceActive()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// In rare case, zookeeper setting may be messed up. That leads to the incorrect
|
||||
// peerClusterId value, which is the same as the source clusterId
|
||||
if (clusterId.equals(peerClusterId) && !replicationEndpoint.canReplicateToSameCluster()) {
|
||||
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
|
||||
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
|
||||
+ replicationEndpoint.getClass().getName(), null, false);
|
||||
this.manager.removeSource(this);
|
||||
return;
|
||||
}
|
||||
LOG.info("{} Source: {}, is now replicating from cluster: {}; to peer cluster: {};",
|
||||
logPeerId(), this.replicationQueueInfo.getQueueId(), clusterId, peerClusterId);
|
||||
|
||||
|
|
|
@ -71,9 +71,9 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
|
|||
HBaseClassTestRule.forClass(TestAsyncReplicationAdminApi.class);
|
||||
|
||||
private final String ID_ONE = "1";
|
||||
private final String KEY_ONE = "127.0.0.1:2181:/hbase";
|
||||
private static String KEY_ONE;
|
||||
private final String ID_TWO = "2";
|
||||
private final String KEY_TWO = "127.0.0.1:2181:/hbase2";
|
||||
private static String KEY_TWO;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
|
@ -82,6 +82,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
|
|||
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
||||
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
|
||||
TEST_UTIL.startMiniCluster();
|
||||
KEY_ONE = TEST_UTIL.getClusterKey() + "-test1";
|
||||
KEY_TWO = TEST_UTIL.getClusterKey() + "-test2";
|
||||
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
|
||||
}
|
||||
|
||||
|
|
|
@ -113,6 +113,11 @@ public class SerialReplicationTestBase {
|
|||
protected void doStop() {
|
||||
notifyStopped();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canReplicateToSameCluster() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
|
|
|
@ -25,7 +25,6 @@ import static org.junit.Assert.fail;
|
|||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
|
@ -34,17 +33,14 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.ServerMetrics;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.Waiter;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
|
@ -72,9 +68,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.HFileTestUtil;
|
||||
import org.apache.hadoop.hbase.wal.WALEdit;
|
||||
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
|
@ -176,40 +170,16 @@ public class TestMasterReplication {
|
|||
|
||||
/**
|
||||
* Tests the replication scenario 0 -> 0. By default
|
||||
* {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the
|
||||
* ReplicationSource should terminate, and no further logs should get enqueued
|
||||
* {@link org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint},
|
||||
* the replication peer should not be added.
|
||||
*/
|
||||
@Test
|
||||
public void testLoopedReplication() throws Exception {
|
||||
@Test(expected = DoNotRetryIOException.class)
|
||||
public void testLoopedReplication()
|
||||
throws Exception {
|
||||
LOG.info("testLoopedReplication");
|
||||
startMiniClusters(1);
|
||||
createTableOnClusters(table);
|
||||
addPeer("1", 0, 0);
|
||||
Thread.sleep(SLEEP_TIME);
|
||||
|
||||
// wait for source to terminate
|
||||
final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
|
||||
Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
ClusterMetrics clusterStatus = utilities[0].getAdmin()
|
||||
.getClusterMetrics(EnumSet.of(ClusterMetrics.Option.LIVE_SERVERS));
|
||||
ServerMetrics serverLoad = clusterStatus.getLiveServerMetrics().get(rsName);
|
||||
List<ReplicationLoadSource> replicationLoadSourceList =
|
||||
serverLoad.getReplicationLoadSourceList();
|
||||
return replicationLoadSourceList.isEmpty();
|
||||
}
|
||||
});
|
||||
|
||||
Table[] htables = getHTablesOnClusters(tableName);
|
||||
putAndWait(row, famName, htables[0], htables[0]);
|
||||
rollWALAndWait(utilities[0], table.getTableName(), row);
|
||||
ZKWatcher zkw = utilities[0].getZooKeeperWatcher();
|
||||
String queuesZnode = ZNodePaths.joinZNode(zkw.getZNodePaths().baseZNode,
|
||||
ZNodePaths.joinZNode("replication", "rs"));
|
||||
List<String> listChildrenNoWatch =
|
||||
ZKUtil.listChildrenNoWatch(zkw, ZNodePaths.joinZNode(queuesZnode, rsName.toString()));
|
||||
assertEquals(0, listChildrenNoWatch.size());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -500,6 +500,11 @@ public class TestReplicationEndpoint extends TestReplicationBase {
|
|||
stoppedCount.incrementAndGet();
|
||||
notifyStopped();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canReplicateToSameCluster() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public static class InterClusterReplicationEndpointForTest
|
||||
|
|
|
@ -127,6 +127,11 @@ public class TestRaceWhenCreatingReplicationSource {
|
|||
protected void doStop() {
|
||||
notifyStopped();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean canReplicateToSameCluster() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
|
|
|
@ -174,8 +174,9 @@ public class TestHBaseFsckCleanReplicationBarriers {
|
|||
}
|
||||
|
||||
public static void createPeer() throws IOException {
|
||||
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
|
||||
.setClusterKey(UTIL.getClusterKey()).setSerial(true).build();
|
||||
ReplicationPeerConfig rpc =
|
||||
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL.getClusterKey() + "-test")
|
||||
.setSerial(true).build();
|
||||
UTIL.getAdmin().addReplicationPeer(PEER_1, rpc);
|
||||
UTIL.getAdmin().addReplicationPeer(PEER_2, rpc);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue