From 67420fe21dfd104f23fd74f83f955dea2f971e71 Mon Sep 17 00:00:00 2001 From: tedyu Date: Sat, 10 Dec 2016 04:33:40 -0800 Subject: [PATCH] HBASE-16336 Removing peers seems to be leaving spare queues (Guanghao Zhang) --- .../apache/hadoop/hbase/master/HMaster.java | 17 ++ .../cleaner/ReplicationZKNodeCleaner.java | 202 ++++++++++++++++++ .../ReplicationZKNodeCleanerChore.java | 55 +++++ .../hbase/util/hbck/ReplicationChecker.java | 143 +++---------- .../cleaner/TestReplicationZKNodeCleaner.java | 115 ++++++++++ 5 files changed, 417 insertions(+), 115 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 970744daa38..167a029267d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -96,6 +96,8 @@ import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.cleaner.ReplicationMetaCleaner; +import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner; +import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleanerChore; import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan; import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; import org.apache.hadoop.hbase.master.normalizer.RegionNormalizer; @@ -135,6 +137,8 @@ import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy; import org.apache.hadoop.hbase.regionserver.compactions.FIFOCompactionPolicy; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; import org.apache.hadoop.hbase.replication.master.TableCFsUpdater; import org.apache.hadoop.hbase.replication.regionserver.Replication; import org.apache.hadoop.hbase.security.UserProvider; @@ -320,6 +324,7 @@ public class HMaster extends HRegionServer implements MasterServices { CatalogJanitor catalogJanitorChore; private ReplicationMetaCleaner replicationMetaCleaner; + private ReplicationZKNodeCleanerChore replicationZKNodeCleanerChore; private LogCleaner logCleaner; private HFileCleaner hfileCleaner; private ExpiredMobFileCleanerChore expiredMobFileCleanerChore; @@ -1008,6 +1013,17 @@ public class HMaster extends HRegionServer implements MasterServices { LOG.trace("Started service threads"); } + // Start replication zk node cleaner + if (conf.getClass("hbase.region.replica.replication.replicationQueues.class", + ReplicationFactory.defaultReplicationQueueClass) == ReplicationQueuesZKImpl.class) { + try { + replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval, + new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this)); + getChoreService().scheduleChore(replicationZKNodeCleanerChore); + } catch (Exception e) { + LOG.error("start replicationZKNodeCleanerChore failed", e); + } + } replicationMetaCleaner = new ReplicationMetaCleaner(this, this, cleanerInterval); getChoreService().scheduleChore(replicationMetaCleaner); } @@ -1043,6 +1059,7 @@ public class HMaster extends HRegionServer implements MasterServices { // Clean up and close up shop if (this.logCleaner != null) this.logCleaner.cancel(true); if (this.hfileCleaner != null) this.hfileCleaner.cancel(true); + if (this.replicationZKNodeCleanerChore != null) this.replicationZKNodeCleanerChore.cancel(true); if (this.replicationMetaCleaner != null) this.replicationMetaCleaner.cancel(true); if (this.quotaManager != null) this.quotaManager.stop(); if (this.activeMasterManager != null) this.activeMasterManager.stop(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java new file mode 100644 index 00000000000..c0a1b753f95 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.cleaner; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; +import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; +import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.zookeeper.KeeperException; + +/** + * Used to clean the replication queues belonging to the peer which does not exist. + */ +@InterfaceAudience.Private +public class ReplicationZKNodeCleaner { + private static final Log LOG = LogFactory.getLog(ReplicationZKNodeCleaner.class); + private final ZooKeeperWatcher zkw; + private final ReplicationQueuesClient queuesClient; + private final ReplicationPeers replicationPeers; + private final ReplicationQueueDeletor queueDeletor; + + public ReplicationZKNodeCleaner(Configuration conf, ZooKeeperWatcher zkw, Abortable abortable) + throws IOException { + try { + this.zkw = zkw; + this.queuesClient = ReplicationFactory + .getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw)); + this.queuesClient.init(); + this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient, + abortable); + this.replicationPeers.init(); + this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable); + } catch (Exception e) { + throw new IOException("failed to construct ReplicationZKNodeCleaner", e); + } + } + + /** + * @return undeletedQueues replicator with its queueIds for removed peers + * @throws IOException + */ + public Map> getUnDeletedQueues() throws IOException { + Map> undeletedQueues = new HashMap<>(); + Set peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds()); + try { + List replicators = this.queuesClient.getListOfReplicators(); + for (String replicator : replicators) { + List queueIds = this.queuesClient.getAllQueues(replicator); + for (String queueId : queueIds) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + if (!peerIds.contains(queueInfo.getPeerId())) { + undeletedQueues.computeIfAbsent(replicator, (key) -> new ArrayList()).add( + queueId); + if (LOG.isDebugEnabled()) { + LOG.debug("Undeleted replication queue for removed peer found: " + + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", + queueInfo.getPeerId(), replicator, queueId)); + } + } + } + } + } catch (KeeperException ke) { + throw new IOException("Failed to get the replication queues of all replicators", ke); + } + return undeletedQueues; + } + + /** + * @return undeletedHFileRefsQueue replicator with its undeleted queueIds for removed peers in + * hfile-refs queue + * @throws IOException + */ + public Set getUnDeletedHFileRefsQueues() throws IOException { + Set undeletedHFileRefsQueue = new HashSet<>(); + Set peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds()); + String hfileRefsZNode = queueDeletor.getHfileRefsZNode(); + try { + if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) { + return null; + } + List listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue(); + Set peers = new HashSet<>(listOfPeers); + peers.removeAll(peerIds); + if (!peers.isEmpty()) { + undeletedHFileRefsQueue.addAll(peers); + } + } catch (KeeperException e) { + throw new IOException("Failed to get list of all peers from hfile-refs znode " + + hfileRefsZNode, e); + } + return undeletedHFileRefsQueue; + } + + private class ReplicationQueueDeletor extends ReplicationStateZKBase { + + public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) { + super(zk, conf, abortable); + } + + /** + * @param replicator The regionserver which has undeleted queue + * @param queueId The undeleted queue id + * @throws IOException + */ + public void removeQueue(final String replicator, final String queueId) throws IOException { + String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator), + queueId); + try { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + if (!replicationPeers.getAllPeerIds().contains(queueInfo.getPeerId())) { + ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath); + LOG.info("Successfully removed replication queue, replicator: " + replicator + + ", queueId: " + queueId); + } + } catch (KeeperException e) { + throw new IOException("Failed to delete queue, replicator: " + replicator + ", queueId: " + + queueId); + } + } + + /** + * @param hfileRefsQueueId The undeleted hfile-refs queue id + * @throws IOException + */ + public void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException { + String node = ZKUtil.joinZNode(this.hfileRefsZNode, hfileRefsQueueId); + try { + if (!replicationPeers.getAllPeerIds().contains(hfileRefsQueueId)) { + ZKUtil.deleteNodeRecursively(this.zookeeper, node); + LOG.info("Successfully removed hfile-refs queue " + hfileRefsQueueId + " from path " + + hfileRefsZNode); + } + } catch (KeeperException e) { + throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId + + " from path " + hfileRefsZNode); + } + } + + String getHfileRefsZNode() { + return this.hfileRefsZNode; + } + } + + /** + * Remove the undeleted replication queue's zk node for removed peers. + * @param undeletedQueues replicator with its queueIds for removed peers + * @throws IOException + */ + public void removeQueues(final Map> undeletedQueues) throws IOException { + for (Entry> replicatorAndQueueIds : undeletedQueues.entrySet()) { + String replicator = replicatorAndQueueIds.getKey(); + for (String queueId : replicatorAndQueueIds.getValue()) { + queueDeletor.removeQueue(replicator, queueId); + } + } + } + + /** + * Remove the undeleted hfile-refs queue's zk node for removed peers. + * @param undeletedHFileRefsQueues replicator with its undeleted queueIds for removed peers in + * hfile-refs queue + * @throws IOException + */ + public void removeHFileRefsQueues(final Set undeletedHFileRefsQueues) throws IOException { + for (String hfileRefsQueueId : undeletedHFileRefsQueues) { + queueDeletor.removeHFileRefsQueue(hfileRefsQueueId); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java new file mode 100644 index 00000000000..4bc1244f564 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.cleaner; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.Stoppable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Chore that will clean the replication queues belonging to the peer which does not exist. + */ +@InterfaceAudience.Private +public class ReplicationZKNodeCleanerChore extends ScheduledChore { + private static final Log LOG = LogFactory.getLog(ReplicationZKNodeCleanerChore.class); + private final ReplicationZKNodeCleaner cleaner; + + public ReplicationZKNodeCleanerChore(Stoppable stopper, int period, + ReplicationZKNodeCleaner cleaner) { + super("ReplicationZKNodeCleanerChore", stopper, period); + this.cleaner = cleaner; + } + + @Override + protected void chore() { + try { + Map> undeletedQueues = cleaner.getUnDeletedQueues(); + cleaner.removeQueues(undeletedQueues); + } catch (IOException e) { + LOG.warn("Failed to clean replication zk node", e); + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java index 4a430ec9ef5..4815f632298 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.util.hbck; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -27,161 +26,75 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationFactory; -import org.apache.hadoop.hbase.replication.ReplicationPeers; +import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments; -import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.util.HBaseFsck; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; /* * Check and fix undeleted replication queues for removed peerId. */ @InterfaceAudience.Private public class ReplicationChecker { - private static final Log LOG = LogFactory.getLog(ReplicationChecker.class); - private final ZooKeeperWatcher zkw; private final ErrorReporter errorReporter; - private final ReplicationQueuesClient queuesClient; - private final ReplicationPeers replicationPeers; - private final ReplicationQueueDeletor queueDeletor; // replicator with its queueIds for removed peers - private final Map> undeletedQueueIds = new HashMap<>(); + private Map> undeletedQueueIds = new HashMap<>(); // replicator with its undeleted queueIds for removed peers in hfile-refs queue private Set undeletedHFileRefsQueueIds = new HashSet<>(); - private final String hfileRefsZNode; + private final ReplicationZKNodeCleaner cleaner; public ReplicationChecker(Configuration conf, ZooKeeperWatcher zkw, ClusterConnection connection, ErrorReporter errorReporter) throws IOException { - try { - this.zkw = zkw; - this.errorReporter = errorReporter; - this.queuesClient = ReplicationFactory.getReplicationQueuesClient( - new ReplicationQueuesClientArguments(conf, connection, zkw)); - this.queuesClient.init(); - this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient, - connection); - this.replicationPeers.init(); - this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, connection); - } catch (Exception e) { - throw new IOException("failed to construct ReplicationChecker", e); - } - - String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication"); - String replicationZNode = ZKUtil.joinZNode(this.zkw.znodePaths.baseZNode, replicationZNodeName); - String hfileRefsZNodeName = - conf.get(ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY, - ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT); - hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName); + this.cleaner = new ReplicationZKNodeCleaner(conf, zkw, connection); + this.errorReporter = errorReporter; } public boolean hasUnDeletedQueues() { - return errorReporter.getErrorList() - .contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE); + return errorReporter.getErrorList().contains( + HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE); } public void checkUnDeletedQueues() throws IOException { - Set peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds()); - try { - List replicators = this.queuesClient.getListOfReplicators(); - for (String replicator : replicators) { - List queueIds = this.queuesClient.getAllQueues(replicator); - for (String queueId : queueIds) { - ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); - if (!peerIds.contains(queueInfo.getPeerId())) { - if (!undeletedQueueIds.containsKey(replicator)) { - undeletedQueueIds.put(replicator, new ArrayList()); - } - undeletedQueueIds.get(replicator).add(queueId); - - String msg = "Undeleted replication queue for removed peer found: " - + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", - queueInfo.getPeerId(), replicator, queueId); - errorReporter.reportError( - HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg); - } - } - } - } catch (KeeperException ke) { - throw new IOException(ke); - } - - checkUnDeletedHFileRefsQueues(peerIds); - } - - private void checkUnDeletedHFileRefsQueues(Set peerIds) throws IOException { - try { - if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) { - return; - } - List listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue(); - Set peers = new HashSet<>(listOfPeers); - peers.removeAll(peerIds); - if (!peers.isEmpty()) { - undeletedHFileRefsQueueIds.addAll(peers); - String msg = - "Undeleted replication hfile-refs queue for removed peer found: " - + undeletedHFileRefsQueueIds + " under hfile-refs node " + hfileRefsZNode; + undeletedQueueIds = cleaner.getUnDeletedQueues(); + for (Entry> replicatorAndQueueIds : undeletedQueueIds.entrySet()) { + String replicator = replicatorAndQueueIds.getKey(); + for (String queueId : replicatorAndQueueIds.getValue()) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + String msg = "Undeleted replication queue for removed peer found: " + + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(), + replicator, queueId); errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg); } - } catch (KeeperException e) { - throw new IOException("Failed to get list of all peers from hfile-refs znode " - + hfileRefsZNode, e); } + + checkUnDeletedHFileRefsQueues(); } - private static class ReplicationQueueDeletor extends ReplicationStateZKBase { - public ReplicationQueueDeletor(ZooKeeperWatcher zk, Configuration conf, Abortable abortable) { - super(zk, conf, abortable); - } - - public void removeQueue(String replicator, String queueId) throws IOException { - String queueZnodePath = ZKUtil.joinZNode(ZKUtil.joinZNode(this.queuesZNode, replicator), - queueId); - try { - ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath); - LOG.info("remove replication queue, replicator: " + replicator + ", queueId: " + queueId); - } catch (KeeperException e) { - throw new IOException("failed to delete queue, replicator: " + replicator + ", queueId: " - + queueId); - } + private void checkUnDeletedHFileRefsQueues() throws IOException { + undeletedHFileRefsQueueIds = cleaner.getUnDeletedHFileRefsQueues(); + if (undeletedHFileRefsQueueIds != null && !undeletedHFileRefsQueueIds.isEmpty()) { + String msg = "Undeleted replication hfile-refs queue for removed peer found: " + + undeletedHFileRefsQueueIds + " under hfile-refs node"; + errorReporter + .reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg); } } public void fixUnDeletedQueues() throws IOException { - for (Entry> replicatorAndQueueIds : undeletedQueueIds.entrySet()) { - String replicator = replicatorAndQueueIds.getKey(); - for (String queueId : replicatorAndQueueIds.getValue()) { - queueDeletor.removeQueue(replicator, queueId); - } + if (!undeletedQueueIds.isEmpty()) { + cleaner.removeQueues(undeletedQueueIds); } fixUnDeletedHFileRefsQueue(); } private void fixUnDeletedHFileRefsQueue() throws IOException { - for (String hfileRefsQueueId : undeletedHFileRefsQueueIds) { - String node = ZKUtil.joinZNode(hfileRefsZNode, hfileRefsQueueId); - try { - ZKUtil.deleteNodeRecursively(this.zkw, node); - LOG.info("Successfully deleted hfile-refs queue " + hfileRefsQueueId + " from path " - + hfileRefsZNode); - } catch (KeeperException e) { - throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId - + " from path " + hfileRefsZNode); - } + if (undeletedHFileRefsQueueIds != null && !undeletedHFileRefsQueueIds.isEmpty()) { + cleaner.removeHFileRefsQueues(undeletedHFileRefsQueueIds); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java new file mode 100644 index 00000000000..e11143d18c0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.master.cleaner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.replication.ReplicationFactory; +import org.apache.hadoop.hbase.replication.ReplicationQueues; +import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; +import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestReplicationZKNodeCleaner { + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private final String ID_ONE = "1"; + private final String SERVER_ONE = "server1"; + private final String ID_TWO = "2"; + private final String SERVER_TWO = "server2"; + + private final Configuration conf; + private final ZooKeeperWatcher zkw; + private final ReplicationQueues repQueues; + + public TestReplicationZKNodeCleaner() throws Exception { + conf = TEST_UTIL.getConfiguration(); + zkw = new ZooKeeperWatcher(conf, "TestReplicationZKNodeCleaner", null); + repQueues = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null, + zkw)); + assertTrue(repQueues instanceof ReplicationQueuesZKImpl); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt("hbase.master.cleaner.interval", 10000); + TEST_UTIL.startMiniCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testReplicationZKNodeCleaner() throws Exception { + repQueues.init(SERVER_ONE); + // add queue for ID_ONE which isn't exist + repQueues.addLog(ID_ONE, "file1"); + + ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null); + Map> undeletedQueues = cleaner.getUnDeletedQueues(); + assertEquals(1, undeletedQueues.size()); + assertTrue(undeletedQueues.containsKey(SERVER_ONE)); + assertEquals(1, undeletedQueues.get(SERVER_ONE).size()); + assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE)); + + // add a recovery queue for ID_TWO which isn't exist + repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2"); + + undeletedQueues = cleaner.getUnDeletedQueues(); + assertEquals(1, undeletedQueues.size()); + assertTrue(undeletedQueues.containsKey(SERVER_ONE)); + assertEquals(2, undeletedQueues.get(SERVER_ONE).size()); + assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE)); + assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_TWO + "-" + SERVER_TWO)); + + cleaner.removeQueues(undeletedQueues); + undeletedQueues = cleaner.getUnDeletedQueues(); + assertEquals(0, undeletedQueues.size()); + } + + @Test + public void testReplicationZKNodeCleanerChore() throws Exception { + repQueues.init(SERVER_ONE); + // add queue for ID_ONE which isn't exist + repQueues.addLog(ID_ONE, "file1"); + // add a recovery queue for ID_TWO which isn't exist + repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2"); + + // Wait the cleaner chore to run + Thread.sleep(20000); + + ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null); + assertEquals(0, cleaner.getUnDeletedQueues().size()); + } +}