From 8d96fc36143d3b217493f87f93a794f50ae4c744 Mon Sep 17 00:00:00 2001 From: zhengzhuobinzzb Date: Sun, 5 Dec 2021 22:26:20 +0800 Subject: [PATCH] HBASE-26482 HMaster may clean wals that is replicating in rare cases (#3887) Signed-off-by: Duo Zhang --- .../replication/ReplicationQueuesClient.java | 8 +++++ .../ReplicationQueuesClientZKImpl.java | 14 ++++++++ .../master/ReplicationLogCleaner.java | 6 +++- .../hbase/master/cleaner/TestLogsCleaner.java | 34 +++++++++++++++++++ .../TestReplicationStateZKImpl.java | 7 ++++ 5 files changed, 68 insertions(+), 1 deletion(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java index 7fa3bbb0930..6b4c869f784 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication; import java.util.List; +import java.util.Map; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -67,6 +68,13 @@ public interface ReplicationQueuesClient { */ int getQueuesZNodeCversion() throws KeeperException; + /** + * Get a map of cversion of all replicator nodes. This can be used as optimistic locking + * to get a consistent snapshot of the replication queues. + * @return a map of replicator to cversion + */ + Map getReplicatorsZNodeCversion() throws KeeperException; + /** * Get the change version number of replication hfile references node. This can be used as * optimistic locking to get a consistent snapshot of the replication queues of hfile references. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java index 14b43348ec4..1f7538c2a81 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java @@ -18,7 +18,9 @@ */ package org.apache.hadoop.hbase.replication; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -92,6 +94,18 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem } } + @Override public Map getReplicatorsZNodeCversion() + throws KeeperException { + List rss = super.getListOfReplicatorsZK(); + Map rsToCversion = new HashMap<>(); + for (String rs : rss) { + Stat stat = new Stat(); + ZKUtil.getDataNoWatch(this.zookeeper, ZKUtil.joinZNode(this.queuesZNode, rs), stat); + rsToCversion.put(rs, stat.getCversion()); + } + return rsToCversion; + } + @Override public int getHFileRefsNodeChangeVersion() throws KeeperException { Stat stat = new Stat(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 7ac84896a0c..cfb1afb02ef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -106,6 +106,9 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); return ImmutableSet.of(); } + // We should also check cversions of all rs nodes to Prevent missing of WAL which are claiming + // by other regionServer. For details, please see HBASE-26482 + Map rsToCversionBefore = replicationQueues.getReplicatorsZNodeCversion(); Set wals = Sets.newHashSet(); for (String rs : rss) { List listOfPeers = replicationQueues.getAllQueues(rs); @@ -121,7 +124,8 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { } } int v1 = replicationQueues.getQueuesZNodeCversion(); - if (v0 == v1) { + Map rsToCversionAfter = replicationQueues.getReplicatorsZNodeCversion(); + if (v0 == v1 && rsToCversionBefore.equals(rsToCversionAfter)) { return wals; } LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d", diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 39cbc96425e..d4c305d5209 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -41,6 +42,7 @@ import java.util.Map; import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -188,6 +190,9 @@ public class TestLogsCleaner { ReplicationQueuesClient rqcMock = mock(ReplicationQueuesClient.class); Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4); + // Avoid direct return because there no replicator. + Mockito.when(rqcMock.getListOfReplicators()) + .thenReturn(Lists.newArrayList("s1", "s2")); Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues"); rqc.setAccessible(true); @@ -196,6 +201,35 @@ public class TestLogsCleaner { // This should return eventually when cversion stabilizes cleaner.getDeletableFiles(new LinkedList()); + // Test did get an optimistic lock + Mockito.verify(rqcMock, atLeast(5)).getQueuesZNodeCversion(); + } + + @Test + public void testReplicatorZnodeCversionChange() + throws KeeperException, NoSuchFieldException, IllegalAccessException { + Configuration conf = TEST_UTIL.getConfiguration(); + ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); + cleaner.setConf(conf); + + ReplicationQueuesClient rqcMock = mock(ReplicationQueuesClient.class); + // Avoid direct return because there no replicator. + Mockito.when(rqcMock.getListOfReplicators()).thenReturn(Lists.newArrayList("s1", "s2")); + Mockito.when(rqcMock.getReplicatorsZNodeCversion()).thenReturn( + ImmutableMap.of("s1", 0, "s2", 0), + ImmutableMap.of("s1", 1, "s2", 1), + ImmutableMap.of("s1", 2, "s2", 2), + ImmutableMap.of("s1", 3, "s2", 3)); + + Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues"); + rqc.setAccessible(true); + + rqc.set(cleaner, rqcMock); + + // This should return eventually when cversion stabilizes + cleaner.getDeletableFiles(new LinkedList()); + // Test did get an optimistic lock + Mockito.verify(rqcMock, atLeast(5)).getReplicatorsZNodeCversion(); } @Test(timeout=10000) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java index f8877dce1aa..07aacebe3ab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java @@ -127,6 +127,13 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic { assertTrue(rqZK.isPeerPath(peerPath)); } + @Test + public void testZNodeCversion() throws ReplicationException, KeeperException { + rq1.init(server1); + + assertTrue(rqc.getReplicatorsZNodeCversion().containsKey(server1)); + } + static class DummyServer implements Server { private String serverName; private boolean isAborted = false;