HBASE-26482 HMaster may clean wals that is replicating in rare cases (#3887)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
c472329460
commit
8d96fc3614
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.hbase.replication;
|
package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
@ -67,6 +68,13 @@ public interface ReplicationQueuesClient {
|
||||||
*/
|
*/
|
||||||
int getQueuesZNodeCversion() throws KeeperException;
|
int getQueuesZNodeCversion() throws KeeperException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a map of cversion of all replicator nodes. This can be used as optimistic locking
|
||||||
|
* to get a consistent snapshot of the replication queues.
|
||||||
|
* @return a map of replicator to cversion
|
||||||
|
*/
|
||||||
|
Map<String, Integer> getReplicatorsZNodeCversion() throws KeeperException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the change version number of replication hfile references node. This can be used as
|
* Get the change version number of replication hfile references node. This can be used as
|
||||||
* optimistic locking to get a consistent snapshot of the replication queues of hfile references.
|
* optimistic locking to get a consistent snapshot of the replication queues of hfile references.
|
||||||
|
|
|
@ -18,7 +18,9 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.replication;
|
package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -92,6 +94,18 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override public Map<String, Integer> getReplicatorsZNodeCversion()
|
||||||
|
throws KeeperException {
|
||||||
|
List<String> rss = super.getListOfReplicatorsZK();
|
||||||
|
Map<String, Integer> rsToCversion = new HashMap<>();
|
||||||
|
for (String rs : rss) {
|
||||||
|
Stat stat = new Stat();
|
||||||
|
ZKUtil.getDataNoWatch(this.zookeeper, ZKUtil.joinZNode(this.queuesZNode, rs), stat);
|
||||||
|
rsToCversion.put(rs, stat.getCversion());
|
||||||
|
}
|
||||||
|
return rsToCversion;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getHFileRefsNodeChangeVersion() throws KeeperException {
|
public int getHFileRefsNodeChangeVersion() throws KeeperException {
|
||||||
Stat stat = new Stat();
|
Stat stat = new Stat();
|
||||||
|
|
|
@ -106,6 +106,9 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
|
||||||
LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
|
LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
|
||||||
return ImmutableSet.of();
|
return ImmutableSet.of();
|
||||||
}
|
}
|
||||||
|
// We should also check cversions of all rs nodes to Prevent missing of WAL which are claiming
|
||||||
|
// by other regionServer. For details, please see HBASE-26482
|
||||||
|
Map<String, Integer> rsToCversionBefore = replicationQueues.getReplicatorsZNodeCversion();
|
||||||
Set<String> wals = Sets.newHashSet();
|
Set<String> wals = Sets.newHashSet();
|
||||||
for (String rs : rss) {
|
for (String rs : rss) {
|
||||||
List<String> listOfPeers = replicationQueues.getAllQueues(rs);
|
List<String> listOfPeers = replicationQueues.getAllQueues(rs);
|
||||||
|
@ -121,7 +124,8 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int v1 = replicationQueues.getQueuesZNodeCversion();
|
int v1 = replicationQueues.getQueuesZNodeCversion();
|
||||||
if (v0 == v1) {
|
Map<String, Integer> rsToCversionAfter = replicationQueues.getReplicatorsZNodeCversion();
|
||||||
|
if (v0 == v1 && rsToCversionBefore.equals(rsToCversionAfter)) {
|
||||||
return wals;
|
return wals;
|
||||||
}
|
}
|
||||||
LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
|
LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
|
||||||
|
|
|
@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Mockito.atLeast;
|
||||||
import static org.mockito.Mockito.doNothing;
|
import static org.mockito.Mockito.doNothing;
|
||||||
import static org.mockito.Mockito.doReturn;
|
import static org.mockito.Mockito.doReturn;
|
||||||
import static org.mockito.Mockito.doThrow;
|
import static org.mockito.Mockito.doThrow;
|
||||||
|
@ -41,6 +42,7 @@ import java.util.Map;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
@ -188,6 +190,9 @@ public class TestLogsCleaner {
|
||||||
|
|
||||||
ReplicationQueuesClient rqcMock = mock(ReplicationQueuesClient.class);
|
ReplicationQueuesClient rqcMock = mock(ReplicationQueuesClient.class);
|
||||||
Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4);
|
Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4);
|
||||||
|
// Avoid direct return because there no replicator.
|
||||||
|
Mockito.when(rqcMock.getListOfReplicators())
|
||||||
|
.thenReturn(Lists.newArrayList("s1", "s2"));
|
||||||
|
|
||||||
Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues");
|
Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues");
|
||||||
rqc.setAccessible(true);
|
rqc.setAccessible(true);
|
||||||
|
@ -196,6 +201,35 @@ public class TestLogsCleaner {
|
||||||
|
|
||||||
// This should return eventually when cversion stabilizes
|
// This should return eventually when cversion stabilizes
|
||||||
cleaner.getDeletableFiles(new LinkedList<FileStatus>());
|
cleaner.getDeletableFiles(new LinkedList<FileStatus>());
|
||||||
|
// Test did get an optimistic lock
|
||||||
|
Mockito.verify(rqcMock, atLeast(5)).getQueuesZNodeCversion();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReplicatorZnodeCversionChange()
|
||||||
|
throws KeeperException, NoSuchFieldException, IllegalAccessException {
|
||||||
|
Configuration conf = TEST_UTIL.getConfiguration();
|
||||||
|
ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
|
||||||
|
cleaner.setConf(conf);
|
||||||
|
|
||||||
|
ReplicationQueuesClient rqcMock = mock(ReplicationQueuesClient.class);
|
||||||
|
// Avoid direct return because there no replicator.
|
||||||
|
Mockito.when(rqcMock.getListOfReplicators()).thenReturn(Lists.newArrayList("s1", "s2"));
|
||||||
|
Mockito.when(rqcMock.getReplicatorsZNodeCversion()).thenReturn(
|
||||||
|
ImmutableMap.of("s1", 0, "s2", 0),
|
||||||
|
ImmutableMap.of("s1", 1, "s2", 1),
|
||||||
|
ImmutableMap.of("s1", 2, "s2", 2),
|
||||||
|
ImmutableMap.of("s1", 3, "s2", 3));
|
||||||
|
|
||||||
|
Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues");
|
||||||
|
rqc.setAccessible(true);
|
||||||
|
|
||||||
|
rqc.set(cleaner, rqcMock);
|
||||||
|
|
||||||
|
// This should return eventually when cversion stabilizes
|
||||||
|
cleaner.getDeletableFiles(new LinkedList<FileStatus>());
|
||||||
|
// Test did get an optimistic lock
|
||||||
|
Mockito.verify(rqcMock, atLeast(5)).getReplicatorsZNodeCversion();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout=10000)
|
@Test(timeout=10000)
|
||||||
|
|
|
@ -127,6 +127,13 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
|
||||||
assertTrue(rqZK.isPeerPath(peerPath));
|
assertTrue(rqZK.isPeerPath(peerPath));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testZNodeCversion() throws ReplicationException, KeeperException {
|
||||||
|
rq1.init(server1);
|
||||||
|
|
||||||
|
assertTrue(rqc.getReplicatorsZNodeCversion().containsKey(server1));
|
||||||
|
}
|
||||||
|
|
||||||
static class DummyServer implements Server {
|
static class DummyServer implements Server {
|
||||||
private String serverName;
|
private String serverName;
|
||||||
private boolean isAborted = false;
|
private boolean isAborted = false;
|
||||||
|
|
Loading…
Reference in New Issue