HBASE-26482 HMaster may clean wals that is replicating in rare cases (#3876)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
e4ad6e9f64
commit
60e263e03b
|
@ -379,6 +379,11 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This implement must update the cversion of root {@link #queuesZNode}. The optimistic lock of
|
||||||
|
* the {@link #getAllWALs()} method is based on the cversion of root {@link #queuesZNode}.
|
||||||
|
* @see #getAllWALs() to show the usage of the cversion of root {@link #queuesZNode} .
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId,
|
public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId,
|
||||||
ServerName destServerName) throws ReplicationException {
|
ServerName destServerName) throws ReplicationException {
|
||||||
|
@ -417,6 +422,12 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
|
||||||
}
|
}
|
||||||
// add delete op for peer
|
// add delete op for peer
|
||||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldQueueNode));
|
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldQueueNode));
|
||||||
|
// Append new queue id for prevent lock competition in zookeeper server.
|
||||||
|
String claimLockZNode = ZNodePaths.joinZNode(queuesZNode, "cversion_" + newQueueId);
|
||||||
|
// A trick for update the cversion of root queuesZNode .
|
||||||
|
// The optimistic lock of the getAllWALs() method is based on the cversion of root queuesZNode
|
||||||
|
listOfOps.add(ZKUtilOp.createAndFailSilent(claimLockZNode, HConstants.EMPTY_BYTE_ARRAY));
|
||||||
|
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(claimLockZNode));
|
||||||
|
|
||||||
LOG.trace("The multi list size is {}", listOfOps.size());
|
LOG.trace("The multi list size is {}", listOfOps.size());
|
||||||
ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
|
ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
|
||||||
|
@ -505,6 +516,13 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
|
||||||
return stat.getCversion();
|
return stat.getCversion();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The optimistic lock of this implement is based on the cversion of root {@link #queuesZNode}.
|
||||||
|
* Therefore, we must update the cversion of root {@link #queuesZNode} when migrate wal nodes to
|
||||||
|
* other queues.
|
||||||
|
* @see #claimQueue(ServerName, String, ServerName) as an example of updating root
|
||||||
|
* {@link #queuesZNode} cversion.
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public Set<String> getAllWALs() throws ReplicationException {
|
public Set<String> getAllWALs() throws ReplicationException {
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -206,18 +206,29 @@ public class TestZKReplicationQueueStorage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// For HBASE-12865
|
// For HBASE-12865, HBASE-26482
|
||||||
@Test
|
@Test
|
||||||
public void testClaimQueueChangeCversion() throws ReplicationException, KeeperException {
|
public void testClaimQueueChangeCversion() throws ReplicationException, KeeperException {
|
||||||
ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
|
ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
|
||||||
STORAGE.addWAL(serverName1, "1", "file");
|
STORAGE.addWAL(serverName1, "1", "file");
|
||||||
|
STORAGE.addWAL(serverName1, "2", "file");
|
||||||
|
|
||||||
|
ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
|
||||||
|
// Avoid claimQueue update cversion for prepare server2 rsNode.
|
||||||
|
STORAGE.addWAL(serverName2, "1", "file");
|
||||||
|
STORAGE.addWAL(serverName2, "2", "file");
|
||||||
|
|
||||||
int v0 = STORAGE.getQueuesZNodeCversion();
|
int v0 = STORAGE.getQueuesZNodeCversion();
|
||||||
ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
|
|
||||||
STORAGE.claimQueue(serverName1, "1", serverName2);
|
STORAGE.claimQueue(serverName1, "1", serverName2);
|
||||||
int v1 = STORAGE.getQueuesZNodeCversion();
|
int v1 = STORAGE.getQueuesZNodeCversion();
|
||||||
// cversion should increase by 1 since a child node is deleted
|
// cversion should be increased by claimQueue method.
|
||||||
assertEquals(1, v1 - v0);
|
assertTrue(v1 > v0);
|
||||||
|
|
||||||
|
STORAGE.claimQueue(serverName1, "2", serverName2);
|
||||||
|
int v2 = STORAGE.getQueuesZNodeCversion();
|
||||||
|
// cversion should be increased by claimQueue method.
|
||||||
|
assertTrue(v2 > v1);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ZKReplicationQueueStorage createWithUnstableVersion() throws IOException {
|
private ZKReplicationQueueStorage createWithUnstableVersion() throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue