From b84fbde17564878050ebb6fd7ca8dd5ce583dd93 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 3 Jan 2018 09:39:44 +0800 Subject: [PATCH] HBASE-19687 Move the logic in ReplicationZKNodeCleaner to ReplicationChecker and remove ReplicationZKNodeCleanerChore --- .../replication/VerifyReplication.java | 6 +- .../hbase/replication/ReplicationPeers.java | 26 +-- .../hbase/replication/ReplicationUtils.java | 38 ++++ .../TestReplicationStateBasic.java | 2 +- .../apache/hadoop/hbase/master/HMaster.java | 13 -- .../cleaner/ReplicationZKNodeCleaner.java | 192 ------------------ .../ReplicationZKNodeCleanerChore.java | 54 ----- .../replication/ReplicationPeerManager.java | 18 +- .../apache/hadoop/hbase/util/HBaseFsck.java | 11 +- .../hbase/util/hbck/ReplicationChecker.java | 111 ++++++---- .../cleaner/TestReplicationZKNodeCleaner.java | 115 ----------- .../hbase/util/TestHBaseFsckReplication.java | 101 +++++++++ .../hbase/util/hbck/HbckTestingUtil.java | 6 +- 13 files changed, 227 insertions(+), 466 deletions(-) delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java index fe45762212e..fac487576c3 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java @@ -50,8 +50,8 @@ import org.apache.hadoop.hbase.mapreduce.TableSplit; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; -import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; @@ -345,10 +345,10 @@ public class VerifyReplication extends Configured implements Tool { } }); ReplicationPeerStorage storage = - ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf); + ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf); ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId); return Pair.newPair(peerConfig, - ReplicationPeers.getPeerClusterConfiguration(peerConfig, conf)); + ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf)); } catch (ReplicationException e) { throw new IOException("An error occurred while trying to connect to the remove peer cluster", e); diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java index 45940a5454f..fcbc350740e 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java @@ -17,14 +17,11 @@ */ package org.apache.hadoop.hbase.replication; -import java.io.IOException; import java.util.Collections; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CompoundConfiguration; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -106,25 +103,6 @@ public class ReplicationPeers { return Collections.unmodifiableSet(peerCache.keySet()); } - public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig, - Configuration baseConf) throws ReplicationException { - Configuration otherConf; - try { - otherConf = HBaseConfiguration.createClusterConf(baseConf, peerConfig.getClusterKey()); - } catch (IOException e) { - throw new ReplicationException("Can't get peer configuration for peer " + peerConfig, e); - } - - if (!peerConfig.getConfiguration().isEmpty()) { - CompoundConfiguration compound = new CompoundConfiguration(); - compound.add(otherConf); - compound.addStringMap(peerConfig.getConfiguration()); - return compound; - } - - return otherConf; - } - public PeerState refreshPeerState(String peerId) throws ReplicationException { ReplicationPeerImpl peer = peerCache.get(peerId); if (peer == null) { @@ -158,7 +136,7 @@ public class ReplicationPeers { private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException { ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId); boolean enabled = peerStorage.isPeerEnabled(peerId); - return new ReplicationPeerImpl(getPeerClusterConfiguration(peerConfig, conf), peerId, enabled, - peerConfig); + return new ReplicationPeerImpl(ReplicationUtils.getPeerClusterConfiguration(peerConfig, conf), + peerId, enabled, peerConfig); } } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java index 2a6069c7d7a..be70e6eefbf 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java @@ -17,8 +17,13 @@ */ package org.apache.hadoop.hbase.replication; +import java.io.IOException; +import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CompoundConfiguration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; import org.apache.yetus.audience.InterfaceAudience; /** @@ -38,4 +43,37 @@ public final class ReplicationUtils { return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT); } + + public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig, + Configuration baseConf) throws ReplicationException { + Configuration otherConf; + try { + otherConf = HBaseConfiguration.createClusterConf(baseConf, peerConfig.getClusterKey()); + } catch (IOException e) { + throw new ReplicationException("Can't get peer configuration for peer " + peerConfig, e); + } + + if (!peerConfig.getConfiguration().isEmpty()) { + CompoundConfiguration compound = new CompoundConfiguration(); + compound.add(otherConf); + compound.addStringMap(peerConfig.getConfiguration()); + return compound; + } + + return otherConf; + } + + public static void removeAllQueues(ReplicationQueueStorage queueStorage, String peerId) + throws ReplicationException { + for (ServerName replicator : queueStorage.getListOfReplicators()) { + List queueIds = queueStorage.getAllQueues(replicator); + for (String queueId : queueIds) { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + if (queueInfo.getPeerId().equals(peerId)) { + queueStorage.removeQueue(replicator, queueId); + } + } + queueStorage.removeReplicatorIfQueueIsEmpty(replicator); + } + } } diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index f3eecccf981..fccffb5d802 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -240,7 +240,7 @@ public abstract class TestReplicationStateBasic { rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true); assertNumberOfPeers(2); - assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationPeers + assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils .getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf()))); rp.getPeerStorage().removePeer(ID_ONE); rp.removePeer(ID_ONE); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 034b71ce510..5960af3a7de 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -108,8 +108,6 @@ import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; -import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner; -import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleanerChore; import org.apache.hadoop.hbase.master.locking.LockManager; import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan; import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType; @@ -364,8 +362,6 @@ public class HMaster extends HRegionServer implements MasterServices { private ClusterStatusPublisher clusterStatusPublisherChore = null; CatalogJanitor catalogJanitorChore; - - private ReplicationZKNodeCleanerChore replicationZKNodeCleanerChore; private LogCleaner logCleaner; private HFileCleaner hfileCleaner; private ExpiredMobFileCleanerChore expiredMobFileCleanerChore; @@ -1166,14 +1162,6 @@ public class HMaster extends HRegionServer implements MasterServices { if (LOG.isTraceEnabled()) { LOG.trace("Started service threads"); } - // Start replication zk node cleaner - try { - replicationZKNodeCleanerChore = new ReplicationZKNodeCleanerChore(this, cleanerInterval, - new ReplicationZKNodeCleaner(this.conf, this.getZooKeeper(), this)); - getChoreService().scheduleChore(replicationZKNodeCleanerChore); - } catch (Exception e) { - LOG.error("start replicationZKNodeCleanerChore failed", e); - } } @Override @@ -1196,7 +1184,6 @@ public class HMaster extends HRegionServer implements MasterServices { // Clean up and close up shop if (this.logCleaner != null) this.logCleaner.cancel(true); if (this.hfileCleaner != null) this.hfileCleaner.cancel(true); - if (this.replicationZKNodeCleanerChore != null) this.replicationZKNodeCleanerChore.cancel(true); if (this.quotaManager != null) this.quotaManager.stop(); if (this.activeMasterManager != null) this.activeMasterManager.stop(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java deleted file mode 100644 index f2c3ec98d37..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java +++ /dev/null @@ -1,192 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.cleaner; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.replication.ReplicationException; -import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; -import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; -import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; -import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.apache.hadoop.hbase.zookeeper.ZNodePaths; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Used to clean the replication queues belonging to the peer which does not exist. - */ -@InterfaceAudience.Private -public class ReplicationZKNodeCleaner { - private static final Logger LOG = LoggerFactory.getLogger(ReplicationZKNodeCleaner.class); - private final ReplicationQueueStorage queueStorage; - private final ReplicationPeerStorage peerStorage; - private final ReplicationQueueDeletor queueDeletor; - - public ReplicationZKNodeCleaner(Configuration conf, ZKWatcher zkw, Abortable abortable) - throws IOException { - this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); - this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkw, conf); - this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable); - } - - /** - * @return undeletedQueues replicator with its queueIds for removed peers - * @throws IOException - */ - public Map> getUnDeletedQueues() throws IOException { - Map> undeletedQueues = new HashMap<>(); - try { - Set peerIds = new HashSet<>(peerStorage.listPeerIds()); - List replicators = this.queueStorage.getListOfReplicators(); - if (replicators == null || replicators.isEmpty()) { - return undeletedQueues; - } - for (ServerName replicator : replicators) { - List queueIds = this.queueStorage.getAllQueues(replicator); - for (String queueId : queueIds) { - ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); - if (!peerIds.contains(queueInfo.getPeerId())) { - undeletedQueues.computeIfAbsent(replicator, (key) -> new ArrayList<>()).add(queueId); - if (LOG.isDebugEnabled()) { - LOG.debug("Undeleted replication queue for removed peer found: " - + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", - queueInfo.getPeerId(), replicator, queueId)); - } - } - } - } - } catch (ReplicationException ke) { - throw new IOException("Failed to get the replication queues of all replicators", ke); - } - return undeletedQueues; - } - - /** - * @return undeletedHFileRefsQueue replicator with its undeleted queueIds for removed peers in - * hfile-refs queue - */ - public Set getUnDeletedHFileRefsQueues() throws IOException { - Set undeletedHFileRefsQueue = new HashSet<>(); - String hfileRefsZNode = queueDeletor.getHfileRefsZNode(); - try { - Set peerIds = new HashSet<>(peerStorage.listPeerIds()); - List listOfPeers = this.queueStorage.getAllPeersFromHFileRefsQueue(); - Set peers = new HashSet<>(listOfPeers); - peers.removeAll(peerIds); - if (!peers.isEmpty()) { - undeletedHFileRefsQueue.addAll(peers); - } - } catch (ReplicationException e) { - throw new IOException("Failed to get list of all peers from hfile-refs znode " - + hfileRefsZNode, e); - } - return undeletedHFileRefsQueue; - } - - private class ReplicationQueueDeletor extends ReplicationStateZKBase { - - ReplicationQueueDeletor(ZKWatcher zk, Configuration conf, Abortable abortable) { - super(zk, conf, abortable); - } - - /** - * @param replicator The regionserver which has undeleted queue - * @param queueId The undeleted queue id - */ - void removeQueue(final ServerName replicator, final String queueId) throws IOException { - String queueZnodePath = - ZNodePaths.joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator.getServerName()), - queueId); - try { - ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); - if (!peerStorage.listPeerIds().contains(queueInfo.getPeerId())) { - ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath); - LOG.info("Successfully removed replication queue, replicator: " + replicator - + ", queueId: " + queueId); - } - } catch (ReplicationException | KeeperException e) { - throw new IOException("Failed to delete queue, replicator: " + replicator + ", queueId: " - + queueId); - } - } - - /** - * @param hfileRefsQueueId The undeleted hfile-refs queue id - * @throws IOException - */ - void removeHFileRefsQueue(final String hfileRefsQueueId) throws IOException { - String node = ZNodePaths.joinZNode(this.hfileRefsZNode, hfileRefsQueueId); - try { - if (!peerStorage.listPeerIds().contains(hfileRefsQueueId)) { - ZKUtil.deleteNodeRecursively(this.zookeeper, node); - LOG.info("Successfully removed hfile-refs queue " + hfileRefsQueueId + " from path " - + hfileRefsZNode); - } - } catch (ReplicationException | KeeperException e) { - throw new IOException("Failed to delete hfile-refs queue " + hfileRefsQueueId - + " from path " + hfileRefsZNode, e); - } - } - - String getHfileRefsZNode() { - return this.hfileRefsZNode; - } - } - - /** - * Remove the undeleted replication queue's zk node for removed peers. - * @param undeletedQueues replicator with its queueIds for removed peers - * @throws IOException - */ - public void removeQueues(final Map> undeletedQueues) throws IOException { - for (Entry> replicatorAndQueueIds : undeletedQueues.entrySet()) { - ServerName replicator = replicatorAndQueueIds.getKey(); - for (String queueId : replicatorAndQueueIds.getValue()) { - queueDeletor.removeQueue(replicator, queueId); - } - } - } - - /** - * Remove the undeleted hfile-refs queue's zk node for removed peers. - * @param undeletedHFileRefsQueues replicator with its undeleted queueIds for removed peers in - * hfile-refs queue - * @throws IOException - */ - public void removeHFileRefsQueues(final Set undeletedHFileRefsQueues) throws IOException { - for (String hfileRefsQueueId : undeletedHFileRefsQueues) { - queueDeletor.removeHFileRefsQueue(hfileRefsQueueId); - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java deleted file mode 100644 index 19ca8041e12..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleanerChore.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.cleaner; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.hbase.ScheduledChore; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.Stoppable; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Chore that will clean the replication queues belonging to the peer which does not exist. - */ -@InterfaceAudience.Private -public class ReplicationZKNodeCleanerChore extends ScheduledChore { - private static final Logger LOG = LoggerFactory.getLogger(ReplicationZKNodeCleanerChore.class); - private final ReplicationZKNodeCleaner cleaner; - - public ReplicationZKNodeCleanerChore(Stoppable stopper, int period, - ReplicationZKNodeCleaner cleaner) { - super("ReplicationZKNodeCleanerChore", stopper, period); - this.cleaner = cleaner; - } - - @Override - protected void chore() { - try { - Map> undeletedQueues = cleaner.getUnDeletedQueues(); - cleaner.removeQueues(undeletedQueues); - } catch (IOException e) { - LOG.warn("Failed to clean replication zk node", e); - } - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java index 1414d225ba9..696b2d7fcb1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.replication.ReplicationUtils; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; @@ -216,19 +217,6 @@ public class ReplicationPeerManager { return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); } - private void removeAllQueues0(String peerId) throws ReplicationException { - for (ServerName replicator : queueStorage.getListOfReplicators()) { - List queueIds = queueStorage.getAllQueues(replicator); - for (String queueId : queueIds) { - ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); - if (queueInfo.getPeerId().equals(peerId)) { - queueStorage.removeQueue(replicator, queueId); - } - } - queueStorage.removeReplicatorIfQueueIsEmpty(replicator); - } - } - public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException { // Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still // on-going when the refresh peer config procedure is done, if a RS which has already been @@ -241,8 +229,8 @@ public class ReplicationPeerManager { // claimed once after the refresh peer procedure done(as the next claim queue will just delete // it), so we can make sure that a two pass scan will finally find the queue and remove it, // unless it has already been removed by others. - removeAllQueues0(peerId); - removeAllQueues0(peerId); + ReplicationUtils.removeAllQueues(queueStorage, peerId); + ReplicationUtils.removeAllQueues(queueStorage, peerId); queueStorage.removePeerFromHFileRefs(peerId); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index fdb4fa4ea1f..9b6bf14e1dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -114,6 +114,7 @@ import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.StoreFileInfo; +import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator; @@ -752,7 +753,7 @@ public class HBaseFsck extends Configured implements Closeable { * @return 0 on success, non-zero on failure */ public int onlineHbck() - throws IOException, KeeperException, InterruptedException { + throws IOException, KeeperException, InterruptedException, ReplicationException { // print hbase server version errors.print("Version: " + status.getHBaseVersion()); @@ -3576,8 +3577,8 @@ public class HBaseFsck extends Configured implements Closeable { return hbi; } - private void checkAndFixReplication() throws IOException { - ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, connection, errors); + private void checkAndFixReplication() throws ReplicationException { + ReplicationChecker checker = new ReplicationChecker(getConf(), zkw, errors); checker.checkUnDeletedQueues(); if (checker.hasUnDeletedQueues() && this.fixReplication) { @@ -4865,8 +4866,8 @@ public class HBaseFsck extends Configured implements Closeable { }; - public HBaseFsck exec(ExecutorService exec, String[] args) throws KeeperException, IOException, - InterruptedException { + public HBaseFsck exec(ExecutorService exec, String[] args) + throws KeeperException, IOException, InterruptedException, ReplicationException { long sleepBeforeRerun = DEFAULT_SLEEP_BEFORE_RERUN; boolean checkCorruptHFiles = false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java index 85fa7297739..c08c6546426 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java @@ -17,84 +17,115 @@ */ package org.apache.hadoop.hbase.util.hbck; -import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.ClusterConnection; -import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner; +import org.apache.hadoop.hbase.replication.ReplicationException; +import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.util.HBaseFsck; import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Check and fix undeleted replication queues for removed peerId. */ @InterfaceAudience.Private public class ReplicationChecker { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationChecker.class); + private final ErrorReporter errorReporter; // replicator with its queueIds for removed peers private Map> undeletedQueueIds = new HashMap<>(); // replicator with its undeleted queueIds for removed peers in hfile-refs queue - private Set undeletedHFileRefsQueueIds = new HashSet<>(); - private final ReplicationZKNodeCleaner cleaner; + private Set undeletedHFileRefsPeerIds = new HashSet<>(); - public ReplicationChecker(Configuration conf, ZKWatcher zkw, ClusterConnection connection, - ErrorReporter errorReporter) throws IOException { - this.cleaner = new ReplicationZKNodeCleaner(conf, zkw, connection); + private final ReplicationPeerStorage peerStorage; + private final ReplicationQueueStorage queueStorage; + + public ReplicationChecker(Configuration conf, ZKWatcher zkw, ErrorReporter errorReporter) { + this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zkw, conf); + this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); this.errorReporter = errorReporter; } public boolean hasUnDeletedQueues() { - return errorReporter.getErrorList().contains( - HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE); + return errorReporter.getErrorList() + .contains(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE); } - public void checkUnDeletedQueues() throws IOException { - undeletedQueueIds = cleaner.getUnDeletedQueues(); - for (Entry> replicatorAndQueueIds : undeletedQueueIds.entrySet()) { - ServerName replicator = replicatorAndQueueIds.getKey(); - for (String queueId : replicatorAndQueueIds.getValue()) { + private Map> getUnDeletedQueues() throws ReplicationException { + Map> undeletedQueues = new HashMap<>(); + Set peerIds = new HashSet<>(peerStorage.listPeerIds()); + for (ServerName replicator : queueStorage.getListOfReplicators()) { + for (String queueId : queueStorage.getAllQueues(replicator)) { ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); - String msg = "Undeleted replication queue for removed peer found: " - + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(), - replicator, queueId); - errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, - msg); + if (!peerIds.contains(queueInfo.getPeerId())) { + undeletedQueues.computeIfAbsent(replicator, key -> new ArrayList<>()).add(queueId); + LOG.debug( + "Undeleted replication queue for removed peer found: " + + "[removedPeerId={}, replicator={}, queueId={}]", + queueInfo.getPeerId(), replicator, queueId); + } } } - - checkUnDeletedHFileRefsQueues(); + return undeletedQueues; } - private void checkUnDeletedHFileRefsQueues() throws IOException { - undeletedHFileRefsQueueIds = cleaner.getUnDeletedHFileRefsQueues(); - if (undeletedHFileRefsQueueIds != null && !undeletedHFileRefsQueueIds.isEmpty()) { - String msg = "Undeleted replication hfile-refs queue for removed peer found: " - + undeletedHFileRefsQueueIds + " under hfile-refs node"; - errorReporter - .reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg); + private Set getUndeletedHFileRefsPeers() throws ReplicationException { + Set undeletedHFileRefsPeerIds = + new HashSet<>(queueStorage.getAllPeersFromHFileRefsQueue()); + Set peerIds = new HashSet<>(peerStorage.listPeerIds()); + undeletedHFileRefsPeerIds.removeAll(peerIds); + if (LOG.isDebugEnabled()) { + for (String peerId : undeletedHFileRefsPeerIds) { + LOG.debug("Undeleted replication hfile-refs queue for removed peer {} found", peerId); + } } + return undeletedHFileRefsPeerIds; } - public void fixUnDeletedQueues() throws IOException { - if (!undeletedQueueIds.isEmpty()) { - cleaner.removeQueues(undeletedQueueIds); + public void checkUnDeletedQueues() throws ReplicationException { + undeletedQueueIds = getUnDeletedQueues(); + undeletedQueueIds.forEach((replicator, queueIds) -> { + queueIds.forEach(queueId -> { + ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId); + String msg = "Undeleted replication queue for removed peer found: " + + String.format("[removedPeerId=%s, replicator=%s, queueId=%s]", queueInfo.getPeerId(), + replicator, queueId); + errorReporter.reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, + msg); + }); + }); + undeletedHFileRefsPeerIds = getUndeletedHFileRefsPeers(); + undeletedHFileRefsPeerIds.stream() + .map( + peerId -> "Undeleted replication hfile-refs queue for removed peer " + peerId + " found") + .forEach(msg -> errorReporter + .reportError(HBaseFsck.ErrorReporter.ERROR_CODE.UNDELETED_REPLICATION_QUEUE, msg)); + } + + public void fixUnDeletedQueues() throws ReplicationException { + for (Map.Entry> replicatorAndQueueIds : undeletedQueueIds.entrySet()) { + ServerName replicator = replicatorAndQueueIds.getKey(); + for (String queueId : replicatorAndQueueIds.getValue()) { + queueStorage.removeQueue(replicator, queueId); + } + queueStorage.removeReplicatorIfQueueIsEmpty(replicator); } - fixUnDeletedHFileRefsQueue(); - } - - private void fixUnDeletedHFileRefsQueue() throws IOException { - if (undeletedHFileRefsQueueIds != null && !undeletedHFileRefsQueueIds.isEmpty()) { - cleaner.removeHFileRefsQueues(undeletedHFileRefsQueueIds); + for (String peerId : undeletedHFileRefsPeerIds) { + queueStorage.removePeerFromHFileRefs(peerId); } } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java deleted file mode 100644 index 4bcde0a5559..00000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.master.cleaner; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.List; -import java.util.Map; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; -import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; -import org.apache.hadoop.hbase.testclassification.MasterTests; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.zookeeper.ZKWatcher; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ MasterTests.class, MediumTests.class }) -public class TestReplicationZKNodeCleaner { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestReplicationZKNodeCleaner.class); - - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - - private final String ID_ONE = "1"; - private final ServerName SERVER_ONE = ServerName.valueOf("server1", 8000, 1234); - private final String ID_TWO = "2"; - private final ServerName SERVER_TWO = ServerName.valueOf("server2", 8000, 1234); - - private final Configuration conf; - private final ZKWatcher zkw; - private final ReplicationQueueStorage repQueues; - - public TestReplicationZKNodeCleaner() throws Exception { - conf = TEST_UTIL.getConfiguration(); - zkw = new ZKWatcher(conf, "TestReplicationZKNodeCleaner", null); - repQueues = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf); - } - - @BeforeClass - public static void setUpBeforeClass() throws Exception { - TEST_UTIL.getConfiguration().setInt("hbase.master.cleaner.interval", 10000); - TEST_UTIL.startMiniCluster(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - @Test - public void testReplicationZKNodeCleaner() throws Exception { - // add queue for ID_ONE which isn't exist - repQueues.addWAL(SERVER_ONE, ID_ONE, "file1"); - - ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null); - Map> undeletedQueues = cleaner.getUnDeletedQueues(); - assertEquals(1, undeletedQueues.size()); - assertTrue(undeletedQueues.containsKey(SERVER_ONE)); - assertEquals(1, undeletedQueues.get(SERVER_ONE).size()); - assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE)); - - // add a recovery queue for ID_TWO which isn't exist - repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2"); - - undeletedQueues = cleaner.getUnDeletedQueues(); - assertEquals(1, undeletedQueues.size()); - assertTrue(undeletedQueues.containsKey(SERVER_ONE)); - assertEquals(2, undeletedQueues.get(SERVER_ONE).size()); - assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE)); - assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_TWO + "-" + SERVER_TWO)); - - cleaner.removeQueues(undeletedQueues); - undeletedQueues = cleaner.getUnDeletedQueues(); - assertEquals(0, undeletedQueues.size()); - } - - @Test - public void testReplicationZKNodeCleanerChore() throws Exception { - // add queue for ID_ONE which isn't exist - repQueues.addWAL(SERVER_ONE, ID_ONE, "file1"); - // add a recovery queue for ID_TWO which isn't exist - repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2"); - - // Wait the cleaner chore to run - Thread.sleep(20000); - - ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null); - assertEquals(0, cleaner.getUnDeletedQueues().size()); - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java new file mode 100644 index 00000000000..e64255cb16d --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsckReplication.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import static org.junit.Assert.assertEquals; + +import java.util.List; +import java.util.stream.Stream; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerStorage; +import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; +import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ReplicationTests; +import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE; +import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ ReplicationTests.class, MediumTests.class }) +public class TestHBaseFsckReplication { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + ReplicationPeerStorage peerStorage = ReplicationStorageFactory + .getReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); + ReplicationQueueStorage queueStorage = ReplicationStorageFactory + .getReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()); + + String peerId1 = "1"; + String peerId2 = "2"; + peerStorage.addPeer(peerId1, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(), + true); + peerStorage.addPeer(peerId2, ReplicationPeerConfig.newBuilder().setClusterKey("key").build(), + true); + for (int i = 0; i < 10; i++) { + queueStorage.addWAL(ServerName.valueOf("localhost", 10000 + i, 100000 + i), peerId1, + "file-" + i); + } + queueStorage.addWAL(ServerName.valueOf("localhost", 10000, 100000), peerId2, "file"); + HBaseFsck fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), true); + HbckTestingUtil.assertNoErrors(fsck); + + // should not remove anything since the replication peer is still alive + assertEquals(10, queueStorage.getListOfReplicators().size()); + peerStorage.removePeer(peerId1); + // there should be orphan queues + assertEquals(10, queueStorage.getListOfReplicators().size()); + fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), false); + HbckTestingUtil.assertErrors(fsck, Stream.generate(() -> { + return ERROR_CODE.UNDELETED_REPLICATION_QUEUE; + }).limit(10).toArray(ERROR_CODE[]::new)); + + // should not delete anything when fix is false + assertEquals(10, queueStorage.getListOfReplicators().size()); + + fsck = HbckTestingUtil.doFsck(UTIL.getConfiguration(), true); + HbckTestingUtil.assertErrors(fsck, Stream.generate(() -> { + return ERROR_CODE.UNDELETED_REPLICATION_QUEUE; + }).limit(10).toArray(ERROR_CODE[]::new)); + + List replicators = queueStorage.getListOfReplicators(); + // should not remove the server with queue for peerId2 + assertEquals(1, replicators.size()); + assertEquals(ServerName.valueOf("localhost", 10000, 100000), replicators.get(0)); + for (String queueId : queueStorage.getAllQueues(replicators.get(0))) { + assertEquals(peerId2, queueId); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java index 60d732403a1..99e4f089024 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/hbck/HbckTestingUtil.java @@ -46,7 +46,7 @@ public class HbckTestingUtil { public static HBaseFsck doFsck(Configuration conf, boolean fixAssignments, boolean fixMeta, boolean fixHdfsHoles, boolean fixHdfsOverlaps, boolean fixHdfsOrphans, boolean fixTableOrphans, boolean fixVersionFile, boolean fixReferenceFiles, boolean fixHFileLinks, - boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, Boolean fixReplication, + boolean fixEmptyMetaRegionInfo, boolean fixTableLocks, boolean fixReplication, TableName table) throws Exception { HBaseFsck fsck = new HBaseFsck(conf, exec); try { @@ -78,10 +78,8 @@ public class HbckTestingUtil { /** * Runs hbck with the -sidelineCorruptHFiles option - * @param conf * @param table table constraint - * @return - * @throws Exception + * @return hbckInstance */ public static HBaseFsck doHFileQuarantine(Configuration conf, TableName table) throws Exception { String[] args = {"-sidelineCorruptHFiles", "-ignorePreCheckPermission", table.getNameAsString()};