HBASE-15393 Enable table replication command will fail when parent znode is not default in peer cluster (Ashish Singhi)

This commit is contained in:
tedyu 2016-03-05 06:28:09 -08:00
parent 27cf0c8c36
commit d083e4f29f
2 changed files with 11 additions and 30 deletions

View File

@ -18,6 +18,9 @@
*/ */
package org.apache.hadoop.hbase.client.replication; package org.apache.hadoop.hbase.client.replication;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -54,11 +57,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
/** /**
* <p> * <p>
@ -595,7 +593,7 @@ public class ReplicationAdmin implements Closeable {
*/ */
private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits) private void checkAndSyncTableDescToPeers(final TableName tableName, final byte[][] splits)
throws IOException { throws IOException {
List<ReplicationPeer> repPeers = listValidReplicationPeers(); List<ReplicationPeer> repPeers = listReplicationPeers();
if (repPeers == null || repPeers.size() <= 0) { if (repPeers == null || repPeers.size() <= 0) {
throw new IllegalArgumentException("Found no peer cluster for replication."); throw new IllegalArgumentException("Found no peer cluster for replication.");
} }
@ -636,46 +634,29 @@ public class ReplicationAdmin implements Closeable {
} }
@VisibleForTesting @VisibleForTesting
List<ReplicationPeer> listValidReplicationPeers() { List<ReplicationPeer> listReplicationPeers() {
Map<String, ReplicationPeerConfig> peers = listPeerConfigs(); Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
if (peers == null || peers.size() <= 0) { if (peers == null || peers.size() <= 0) {
return null; return null;
} }
List<ReplicationPeer> validPeers = new ArrayList<ReplicationPeer>(peers.size()); List<ReplicationPeer> listOfPeers = new ArrayList<ReplicationPeer>(peers.size());
for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) { for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
String peerId = peerEntry.getKey(); String peerId = peerEntry.getKey();
Stat s = null;
try { try {
Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId); Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
Configuration peerConf = pair.getSecond(); Configuration peerConf = pair.getSecond();
ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(), ReplicationPeer peer = new ReplicationPeerZKImpl(peerConf, peerId, pair.getFirst(),
parseTableCFsFromConfig(this.getPeerTableCFs(peerId))); parseTableCFsFromConfig(this.getPeerTableCFs(peerId)));
s = listOfPeers.add(peer);
zkw.getRecoverableZooKeeper().exists(peerConf.get(HConstants.ZOOKEEPER_ZNODE_PARENT),
null);
if (null == s) {
LOG.info(peerId + ' ' + pair.getFirst().getClusterKey() + " is invalid now.");
continue;
}
validPeers.add(peer);
} catch (ReplicationException e) { } catch (ReplicationException e) {
LOG.warn("Failed to get valid replication peers. " LOG.warn("Failed to get valid replication peers. "
+ "Error connecting to peer cluster with peerId=" + peerId); + "Error connecting to peer cluster with peerId=" + peerId + ". Error message="
+ e.getMessage());
LOG.debug("Failure details to get valid replication peers.", e); LOG.debug("Failure details to get valid replication peers.", e);
continue; continue;
} catch (KeeperException e) {
LOG.warn("Failed to get valid replication peers. KeeperException code="
+ e.code().intValue());
LOG.debug("Failure details to get valid replication peers.", e);
continue;
} catch (InterruptedException e) {
LOG.warn("Failed to get valid replication peers due to InterruptedException.");
LOG.debug("Failure details to get valid replication peers.", e);
Thread.currentThread().interrupt();
continue;
} }
} }
return validPeers; return listOfPeers;
} }
/** /**

View File

@ -136,7 +136,7 @@ public class TestReplicationAdmin {
config.getConfiguration().put("key2", "value2"); config.getConfiguration().put("key2", "value2");
admin.addPeer(ID_ONE, config, null); admin.addPeer(ID_ONE, config, null);
List<ReplicationPeer> peers = admin.listValidReplicationPeers(); List<ReplicationPeer> peers = admin.listReplicationPeers();
assertEquals(1, peers.size()); assertEquals(1, peers.size());
ReplicationPeer peerOne = peers.get(0); ReplicationPeer peerOne = peers.get(0);
assertNotNull(peerOne); assertNotNull(peerOne);