HBASE-15888 Extend HBASE-12769 for bulk load data replication

This commit is contained in:
Ashish Singhi 2016-06-03 18:42:00 +05:30
parent 72d3f2a868
commit 0cbce07626
2 changed files with 61 additions and 4 deletions

View File

@ -550,6 +550,12 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
}
}
// Check for hfile-refs queue
if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode)
&& queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) {
throw new ReplicationException("Undeleted queue for peerId: " + peerId
+ ", found in hfile-refs node path " + hfileRefsZNode);
}
} catch (KeeperException e) {
throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
}

View File

@ -51,16 +51,21 @@ import org.apache.zookeeper.KeeperException;
@InterfaceAudience.Private
public class ReplicationChecker {
private static final Log LOG = LogFactory.getLog(ReplicationChecker.class);
private final ZooKeeperWatcher zkw;
private final ErrorReporter errorReporter;
private final ReplicationQueuesClient queuesClient;
private final ReplicationPeers replicationPeers;
private final ReplicationQueueDeletor queueDeletor;
// replicator with its queueIds for removed peers
private final Map<String, List<String>> undeletedQueueIds = new HashMap<>();
// replicator with its undeleted queueIds for removed peers in hfile-refs queue
private Set<String> undeletedHFileRefsQueueIds = new HashSet<>();
private final String hfileRefsZNode;
public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, ClusterConnection connection,
ErrorReporter errorReporter) throws IOException {
try {
this.zkw = zkw;
this.errorReporter = errorReporter;
this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, connection);
this.queuesClient.init();
@ -71,6 +76,13 @@ public class ReplicationChecker {
} catch (ReplicationException e) {
throw new IOException("failed to construct ReplicationChecker", e);
}
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
String replicationZNode = ZKUtil.joinZNode(this.zkw.baseZNode, replicationZNodeName);
String hfileRefsZNodeName =
conf.get(ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName);
}
public boolean hasUnDeletedQueues() {
@ -103,6 +115,30 @@ public class ReplicationChecker {
} catch (KeeperException ke) {
throw new IOException(ke);
}
checkUnDeletedHFileRefsQueues(peerIds);
}
private void checkUnDeletedHFileRefsQueues(Set<String> peerIds) throws IOException {
try {
if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) {
return;
}
List<String> listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue();
Set<String> peers = new HashSet<>(listOfPeers);
peers.removeAll(peerIds);
if (!peers.isEmpty()) {
undeletedHFileRefsQueueIds.addAll(peers);
String msg =
"Undeleted replication hfile-refs queue for removed peer found: "
+ undeletedHFileRefsQueueIds + " under hfile-refs node " + hfileRefsZNode;
errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE,
msg);
}
} catch (KeeperException e) {
throw new IOException("Failed to get list of all peers from hfile-refs znode "
+ hfileRefsZNode, e);
}
}
private static class ReplicationQueueDeletor extends ReplicationStateZKBase {
@ -130,5 +166,20 @@ public class ReplicationChecker {
queueDeletor.removeQueue(replicator, queueId);
}
}
fixUnDeletedHFileRefsQueue();
}
private void fixUnDeletedHFileRefsQueue() throws IOException {
for (String hfileRefsQueueId : undeletedHFileRefsQueueIds) {
String node = ZKUtil.joinZNode(hfileRefsZNode, hfileRefsQueueId);
try {
ZKUtil.deleteNodeRecursively(this.zkw, node);
LOG.info("Successfully deleted hfile-refs queue " + hfileRefsQueueId + " from path "
+ hfileRefsZNode);
} catch (KeeperException e) {
throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId
+ " from path " + hfileRefsZNode);
}
}
}
}