From d87b05f040608e7ffa7908246d914d5ff1353943 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sun, 11 Dec 2016 07:43:10 -0800 Subject: [PATCH] HBASE-16336 Removing peers seems to be leaving spare queues (Guanghao Zhang) --- .../replication/ReplicationQueuesZKImpl.java | 2 +- .../apache/hadoop/hbase/master/HMaster.java | 11 + .../cleaner/ReplicationZKNodeCleaner.java | 210 ++++++++++++++++++ .../ReplicationZKNodeCleanerChore.java | 55 +++++ .../hbase/util/hbck/ReplicationChecker.java | 141 +++--------- .../cleaner/TestReplicationZKNodeCleaner.java | 113 ++++++++++ 6 files changed, 418 insertions(+), 114 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java index 4a4c8cf25c3..a903159fae2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java @@ -69,7 +69,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R /** Znode containing all replication queues for this region server. */ private String myQueuesZnode; /** Name of znode we use to lock during failover */ - private final static String RS_LOCK_ZNODE = "lock"; + public final static String RS_LOCK_ZNODE = "lock"; private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index cf54397c916..b68ca55a947 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -99,6 +99,8 @@ import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner; import org.apache.hadoop.hbase.master.cleaner.ReplicationZKLockCleanerChore; +import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner; +import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleanerChore; import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizerChore; @@ -324,6 +326,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { CatalogJanitor catalogJanitorChore; private ReplicationZKLockCleanerChore replicationZKLockCleanerChore; + private ReplicationZKNodeCleanerChore replicationZKNodeCleanerChore; private ReplicationMetaCleaner replicationMetaCleaner; private LogCleaner logCleaner; private HFileCleaner hfileCleaner; @@ -1182,6 +1185,13 @@ public class HMaster extends HRegionServer implements MasterServices, Server { LOG.error("start replicationZKLockCleanerChore failed", e); } } + try { + replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval, + new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this)); + getChoreService().scheduleChore(replicationZKNodeCleanerChore); + } catch (Exception e) { + LOG.error("start replicationZKNodeCleanerChore failed", e); + } try { replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval); getChoreService().scheduleChore(replicationMetaCleaner); @@ -1222,6 +1232,7 @@ public class HMaster extends HRegionServer implements MasterServices, Server { if (this.logCleaner != null) this.logCleaner.cancel(true); if (this.hfileCleaner != null) this.hfileCleaner.cancel(true); if (this.replicationZKLockCleanerChore != null) this.replicationZKLockCleanerChore.cancel(true); + if (this.replicationZKNodeCleanerChore != null) this.replicationZKNodeCleanerChore.cancel(true); if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true); if (this.quotaManager != null) this.quotaManager.stop(); if (this.activeMasterManager != null) this.activeMasterManager.stop(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java new file mode 100644 index 00000000000..8311b8d804b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.cleaner; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; +import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * Used to clean the replication queues belonging to the peer which does not exist. + */ +@InterfaceAudience.Private +public class ReplicationZKNodeCleaner { + private static final Log LOG = LogFactory.getLog(ReplicationZKNodeCleaner.class); + private final ZooKeeperWatcher zkw; + private final ReplicationQueuesClient queuesClient; + private final ReplicationPeers replicationPeers; + private final ReplicationQueueDeletor queueDeletor; + private final boolean useMulti; + + public ReplicationZKNodeCleaner(Configuration conf, ZooKeeperWatcher zkw, Abortable abortable) + throws IOException { + try { + this.zkw = zkw; + this.queuesClient = ReplicationFactory + .getReplicationQueuesClient(zkw, conf, abortable); + this.queuesClient.init(); + this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient, + abortable); + this.replicationPeers.init(); + this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable); + this.useMulti = conf.getBoolean(HConstants.ZOOKEEPER_USEMULTI, true); + } catch (Exception e) { + throw new IOException("failed to construct ReplicationZKNodeCleaner", e); + } + } + + /** + * @return undeletedQueues replicator with its queueIds for removed peers + * @throws IOException + */ + public Map> getUnDeletedQueues() throws IOException { + Map> undeletedQueues = new HashMap<>(); + Set peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds()); + try { + List replicators = this.queuesClient.getListOfReplicators(); + for (String replicator : replicators) { + List queueIds = this.queuesClient.getAllQueues(replicator); + for (String queueId : queueIds) { + if (!useMulti && queueId.equals(ReplicationQueuesZKImpl.RS_LOCK_ZNODE)) { + continue; + } + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + if (!peerIds.contains(queueInfo.getPeerId())) { + if (!undeletedQueues.containsKey(replicator)) { + undeletedQueues.put(replicator, new ArrayList()); + } + undeletedQueues.get(replicator).add(queueId); + if (LOG.isDebugEnabled()) { + LOG.debug("Undeleted replication queue for removed peer found: " + + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", + queueInfo.getPeerId(), replicator, queueId)); + } + } + } + } + } catch (KeeperException ke) { + throw new IOException("Failed to get the replication queues of all replicators", ke); + } + return undeletedQueues; + } + + /** + * @return undeletedHFileRefsQueue replicator with its undeleted queueIds for removed peers in + * hfile-refs queue + * @throws IOException + */ + public Set getUnDeletedHFileRefsQueues() throws IOException { + Set undeletedHFileRefsQueue = new HashSet<>(); + Set peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds()); + String hfileRefsZNode = queueDeletor.getHfileRefsZNode(); + try { + if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) { + return null; + } + List listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue(); + Set peers = new HashSet<>(listOfPeers); + peers.removeAll(peerIds); + if (!peers.isEmpty()) { + undeletedHFileRefsQueue.addAll(peers); + } + } catch (KeeperException e) { + throw new IOException("Failed to get list of all peers from hfile-refs znode " + + hfileRefsZNode, e); + } + return undeletedHFileRefsQueue; + } + + private class ReplicationQueueDeletor extends ReplicationStateZKBase { + + public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) { + super(zk, conf, abortable); + } + + /** + * @param replicator The regionserver which has undeleted queue + * @param queueId The undeleted queue id + * @throws IOException + */ + public void removeQueue(final String replicator, final String queueId) throws IOException { + String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator), + queueId); + try { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + if (!replicationPeers.getAllPeerIds().contains(queueInfo.getPeerId())) { + ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath); + LOG.info("Successfully removed replication queue, replicator: " + replicator + + ", queueId: " + queueId); + } + } catch (KeeperException e) { + throw new IOException("Failed to delete queue, replicator: " + replicator + ", queueId: " + + queueId); + } + } + + /** + * @param hfileRefsQueueId The undeleted hfile-refs queue id + * @throws IOException + */ + public void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException { + String node = ZKUtil.joinZNode(this.hfileRefsZNode, hfileRefsQueueId); + try { + if (!replicationPeers.getAllPeerIds().contains(hfileRefsQueueId)) { + ZKUtil.deleteNodeRecursively(this.zookeeper, node); + LOG.info("Successfully removed hfile-refs queue " + hfileRefsQueueId + " from path " + + hfileRefsZNode); + } + } catch (KeeperException e) { + throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId + + " from path " + hfileRefsZNode); + } + } + + String getHfileRefsZNode() { + return this.hfileRefsZNode; + } + } + + /** + * Remove the undeleted replication queue's zk node for removed peers. + * @param undeletedQueues replicator with its queueIds for removed peers + * @throws IOException + */ + public void removeQueues(final Map> undeletedQueues) throws IOException { + for (Entry> replicatorAndQueueIds : undeletedQueues.entrySet()) { + String replicator = replicatorAndQueueIds.getKey(); + for (String queueId : replicatorAndQueueIds.getValue()) { + queueDeletor.removeQueue(replicator, queueId); + } + } + } + + /** + * Remove the undeleted hfile-refs queue's zk node for removed peers. + * @param undeletedHFileRefsQueues replicator with its undeleted queueIds for removed peers in + * hfile-refs queue + * @throws IOException + */ + public void removeHFileRefsQueues(final Set undeletedHFileRefsQueues) throws IOException { + for (String hfileRefsQueueId : undeletedHFileRefsQueues) { + queueDeletor.removeHFileRefsQueue(hfileRefsQueueId); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java new file mode 100644 index 00000000000..4bc1244f564 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.cleaner; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Chore that will clean the replication queues belonging to the peer which does not exist. + */ +@InterfaceAudience.Private +public class ReplicationZKNodeCleanerChore extends ScheduledChore { + private static final Log LOG = LogFactory.getLog(ReplicationZKNodeCleanerChore.class); + private final ReplicationZKNodeCleaner cleaner; + + public ReplicationZKNodeCleanerChore(Stoppable stopper, int period, + ReplicationZKNodeCleaner cleaner) { + super("ReplicationZKNodeCleanerChore", stopper, period); + this.cleaner = cleaner; + } + + @Override + protected void chore() { + try { + Map> undeletedQueues = cleaner.getUnDeletedQueues(); + cleaner.removeQueues(undeletedQueues); + } catch (IOException e) { + LOG.warn("Failed to clean replication zk node", e); + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java index 64212c905a3..dcb0010e9ba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.util.hbck; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -27,159 +26,75 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationFactory; -import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; -import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.util.HBaseFsck; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; /* * Check and fix undeleted replication queues for removed peerId. */ @InterfaceAudience.Private public class ReplicationChecker { - private static final Log LOG = LogFactory.getLog(ReplicationChecker.class); - private final ZooKeeperWatcher zkw; - private ErrorReporter errorReporter; - private ReplicationQueuesClient queuesClient; - private ReplicationPeers replicationPeers; - private ReplicationQueueDeletor queueDeletor; + private final ErrorReporter errorReporter; // replicator with its queueIds for removed peers private Map> undeletedQueueIds = new HashMap>(); // Set of un deleted hfile refs queue Ids private Set undeletedHFileRefsQueueIds = new HashSet<>(); - private final String hfileRefsZNode; + private final ReplicationZKNodeCleaner cleaner; public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, HConnection connection, ErrorReporter errorReporter) throws IOException { - try { - this.zkw = zkw; - this.errorReporter = errorReporter; - this.queuesClient = ReplicationFactory.getReplicationQueuesClient(zkw, conf, connection); - this.queuesClient.init(); - this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient, - connection); - this.replicationPeers.init(); - this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, connection); - } catch (ReplicationException e) { - throw new IOException("failed to construct ReplicationChecker", e); - } - - String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); - String replicationZNode = ZKUtil.joinZNode(this.zkw.baseZNode, replicationZNodeName); - String hfileRefsZNodeName = - conf.get(ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, - ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); - hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName); + this.cleaner = new ReplicationZKNodeCleaner(conf, zkw, connection); + this.errorReporter = errorReporter; } public boolean hasUnDeletedQueues() { - return errorReporter.getErrorList() - .contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE); + return errorReporter.getErrorList().contains( + HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE); } public void checkUnDeletedQueues() throws IOException { - Set peerIds = new HashSet(this.replicationPeers.getAllPeerIds()); - try { - List replicators = this.queuesClient.getListOfReplicators(); - for (String replicator : replicators) { - List queueIds = this.queuesClient.getAllQueues(replicator); - for (String queueId : queueIds) { - ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); - if (!peerIds.contains(queueInfo.getPeerId())) { - if (!undeletedQueueIds.containsKey(replicator)) { - undeletedQueueIds.put(replicator, new ArrayList()); - } - undeletedQueueIds.get(replicator).add(queueId); - - String msg = "Undeleted replication queue for removed peer found: " - + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", - queueInfo.getPeerId(), replicator, queueId); - errorReporter.reportError( - HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg); - } - } - } - } catch (KeeperException ke) { - throw new IOException(ke); - } - - checkUnDeletedHFileRefsQueues(peerIds); - } - - private void checkUnDeletedHFileRefsQueues(Set peerIds) throws IOException { - try { - if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) { - return; - } - List listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue(); - Set peers = new HashSet<>(listOfPeers); - peers.removeAll(peerIds); - if (!peers.isEmpty()) { - undeletedHFileRefsQueueIds.addAll(peers); - String msg = - "Undeleted replication hfile-refs queue for removed peer found: " - + undeletedHFileRefsQueueIds + " under hfile-refs node " + hfileRefsZNode; + undeletedQueueIds = cleaner.getUnDeletedQueues(); + for (Entry> replicatorAndQueueIds : undeletedQueueIds.entrySet()) { + String replicator = replicatorAndQueueIds.getKey(); + for (String queueId : replicatorAndQueueIds.getValue()) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + String msg = "Undeleted replication queue for removed peer found: " + + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(), + replicator, queueId); errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg); } - } catch (KeeperException e) { - throw new IOException("Failed to get list of all peers from hfile-refs znode " - + hfileRefsZNode, e); } + + checkUnDeletedHFileRefsQueues(); } - private static class ReplicationQueueDeletor extends ReplicationStateZKBase { - public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) { - super(zk, conf, abortable); - } - - public void removeQueue(String replicator, String queueId) throws IOException { - String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator), - queueId); - try { - ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath); - LOG.info("remove replication queue, replicator: " + replicator + ", queueId: " + queueId); - } catch (KeeperException e) { - throw new IOException("failed to delete queue, replicator: " + replicator + ", queueId: " - + queueId); - } + private void checkUnDeletedHFileRefsQueues() throws IOException { + undeletedHFileRefsQueueIds = cleaner.getUnDeletedHFileRefsQueues(); + if (undeletedHFileRefsQueueIds != null && !undeletedHFileRefsQueueIds.isEmpty()) { + String msg = "Undeleted replication hfile-refs queue for removed peer found: " + + undeletedHFileRefsQueueIds + " under hfile-refs node"; + errorReporter + .reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg); } } public void fixUnDeletedQueues() throws IOException { - for (Entry> replicatorAndQueueIds : undeletedQueueIds.entrySet()) { - String replicator = replicatorAndQueueIds.getKey(); - for (String queueId : replicatorAndQueueIds.getValue()) { - queueDeletor.removeQueue(replicator, queueId); - } + if (!undeletedQueueIds.isEmpty()) { + cleaner.removeQueues(undeletedQueueIds); } fixUnDeletedHFileRefsQueue(); } private void fixUnDeletedHFileRefsQueue() throws IOException { - for (String hfileRefsQueueId : undeletedHFileRefsQueueIds) { - String node = ZKUtil.joinZNode(hfileRefsZNode, hfileRefsQueueId); - try { - ZKUtil.deleteNodeRecursively(this.zkw, node); - LOG.info("Successfully deleted hfile-refs queue " + hfileRefsQueueId + " from path " - + hfileRefsZNode); - } catch (KeeperException e) { - throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId - + " from path " + hfileRefsZNode); - } + if (undeletedHFileRefsQueueIds != null && !undeletedHFileRefsQueueIds.isEmpty()) { + cleaner.removeHFileRefsQueues(undeletedHFileRefsQueueIds); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java new file mode 100644 index 00000000000..6cb4973e77a --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.cleaner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestReplicationZKNodeCleaner { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private final String ID_ONE = "1"; + private final String SERVER_ONE = "server1"; + private final String ID_TWO = "2"; + private final String SERVER_TWO = "server2"; + + private final Configuration conf; + private final ZooKeeperWatcher zkw; + private final ReplicationQueues repQueues; + + public TestReplicationZKNodeCleaner() throws Exception { + conf = TEST_UTIL.getConfiguration(); + zkw = new ZooKeeperWatcher(conf, "TestReplicationZKNodeCleaner", null); + repQueues = ReplicationFactory.getReplicationQueues(zkw, conf, null); + assertTrue(repQueues instanceof ReplicationQueuesZKImpl); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.cleaner.interval", 10000); + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testReplicationZKNodeCleaner() throws Exception { + repQueues.init(SERVER_ONE); + // add queue for ID_ONE which isn't exist + repQueues.addLog(ID_ONE, "file1"); + + ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null); + Map> undeletedQueues = cleaner.getUnDeletedQueues(); + assertEquals(1, undeletedQueues.size()); + assertTrue(undeletedQueues.containsKey(SERVER_ONE)); + assertEquals(1, undeletedQueues.get(SERVER_ONE).size()); + assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE)); + + // add a recovery queue for ID_TWO which isn't exist + repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2"); + + undeletedQueues = cleaner.getUnDeletedQueues(); + assertEquals(1, undeletedQueues.size()); + assertTrue(undeletedQueues.containsKey(SERVER_ONE)); + assertEquals(2, undeletedQueues.get(SERVER_ONE).size()); + assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE)); + assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_TWO + "-" + SERVER_TWO)); + + cleaner.removeQueues(undeletedQueues); + undeletedQueues = cleaner.getUnDeletedQueues(); + assertEquals(0, undeletedQueues.size()); + } + + @Test + public void testReplicationZKNodeCleanerChore() throws Exception { + repQueues.init(SERVER_ONE); + // add queue for ID_ONE which isn't exist + repQueues.addLog(ID_ONE, "file1"); + // add a recovery queue for ID_TWO which isn't exist + repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2"); + + // Wait the cleaner chore to run + Thread.sleep(20000); + + ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null); + assertEquals(0, cleaner.getUnDeletedQueues().size()); + } +}