From 950a09b03cf1a306ed5a2754c11457daf30e7d23 Mon Sep 17 00:00:00 2001 From: Ashish Singhi Date: Fri, 3 Jun 2016 18:46:41 +0530 Subject: [PATCH] HBASE-15888 Extend HBASE-12769 for bulk load data replication --- .../replication/ReplicationPeersZKImpl.java | 6 ++ .../hbase/util/hbck/ReplicationChecker.java | 59 +++++++++++++++++-- 2 files changed, 61 insertions(+), 4 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index c0c3f7ea536..076167e7114 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -567,6 +567,12 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } } } + // Check for hfile-refs queue + if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode) + && queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) { + throw new ReplicationException("Undeleted queue for peerId: " + peerId + + ", found in hfile-refs node path " + hfileRefsZNode); + } } catch (KeeperException e) { throw new ReplicationException("Could not check queues deleted with id=" + peerId, e); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java index bf44a50677b..64212c905a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java @@ -51,16 +51,21 @@ import org.apache.zookeeper.KeeperException; @InterfaceAudience.Private public class ReplicationChecker { private static final Log LOG = LogFactory.getLog(ReplicationChecker.class); + private final ZooKeeperWatcher zkw; private ErrorReporter errorReporter; private ReplicationQueuesClient queuesClient; private ReplicationPeers replicationPeers; private ReplicationQueueDeletor queueDeletor; // replicator with its queueIds for removed peers private Map> undeletedQueueIds = new HashMap>(); - + // Set of un deleted hfile refs queue Ids + private Set undeletedHFileRefsQueueIds = new HashSet<>(); + private final String hfileRefsZNode; + public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, HConnection connection, ErrorReporter errorReporter) throws IOException { try { + this.zkw = zkw; this.errorReporter = errorReporter; this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, connection); this.queuesClient.init(); @@ -71,6 +76,13 @@ public class ReplicationChecker { } catch (ReplicationException e) { throw new IOException("failed to construct ReplicationChecker", e); } + + String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); + String replicationZNode = ZKUtil.joinZNode(this.zkw.baseZNode, replicationZNodeName); + String hfileRefsZNodeName = + conf.get(ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, + ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); + hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName); } public boolean hasUnDeletedQueues() { @@ -103,13 +115,37 @@ public class ReplicationChecker { } catch (KeeperException ke) { throw new IOException(ke); } + + checkUnDeletedHFileRefsQueues(peerIds); } - + + private void checkUnDeletedHFileRefsQueues(Set peerIds) throws IOException { + try { + if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) { + return; + } + List listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue(); + Set peers = new HashSet<>(listOfPeers); + peers.removeAll(peerIds); + if (!peers.isEmpty()) { + undeletedHFileRefsQueueIds.addAll(peers); + String msg = + "Undeleted replication hfile-refs queue for removed peer found: " + + undeletedHFileRefsQueueIds + " under hfile-refs node " + hfileRefsZNode; + errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, + msg); + } + } catch (KeeperException e) { + throw new IOException("Failed to get list of all peers from hfile-refs znode " + + hfileRefsZNode, e); + } + } + private static class ReplicationQueueDeletor extends ReplicationStateZKBase { public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) { super(zk, conf, abortable); } - + public void removeQueue(String replicator, String queueId) throws IOException { String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator), queueId); @@ -122,7 +158,7 @@ public class ReplicationChecker { } } } - + public void fixUnDeletedQueues() throws IOException { for (Entry> replicatorAndQueueIds : undeletedQueueIds.entrySet()) { String replicator = replicatorAndQueueIds.getKey(); @@ -130,5 +166,20 @@ public class ReplicationChecker { queueDeletor.removeQueue(replicator, queueId); } } + fixUnDeletedHFileRefsQueue(); + } + + private void fixUnDeletedHFileRefsQueue() throws IOException { + for (String hfileRefsQueueId : undeletedHFileRefsQueueIds) { + String node = ZKUtil.joinZNode(hfileRefsZNode, hfileRefsQueueId); + try { + ZKUtil.deleteNodeRecursively(this.zkw, node); + LOG.info("Successfully deleted hfile-refs queue " + hfileRefsQueueId + " from path " + + hfileRefsZNode); + } catch (KeeperException e) { + throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId + + " from path " + hfileRefsZNode); + } + } } }