diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index 5c480bacdd8..c51bdfcc283 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -379,6 +379,11 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase return 0; } + /** + * This implement must update the cversion of root {@link #queuesZNode}. The optimistic lock of + * the {@link #getAllWALs()} method is based on the cversion of root {@link #queuesZNode}. + * @see #getAllWALs() to show the usage of the cversion of root {@link #queuesZNode} . + */ @Override public Pair> claimQueue(ServerName sourceServerName, String queueId, ServerName destServerName) throws ReplicationException { @@ -417,6 +422,12 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } // add delete op for peer listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldQueueNode)); + // Append new queue id for prevent lock competition in zookeeper server. + String claimLockZNode = ZNodePaths.joinZNode(queuesZNode, "cversion_" + newQueueId); + // A trick for update the cversion of root queuesZNode . + // The optimistic lock of the getAllWALs() method is based on the cversion of root queuesZNode + listOfOps.add(ZKUtilOp.createAndFailSilent(claimLockZNode, HConstants.EMPTY_BYTE_ARRAY)); + listOfOps.add(ZKUtilOp.deleteNodeFailSilent(claimLockZNode)); LOG.trace("The multi list size is {}", listOfOps.size()); ZKUtil.multiOrSequential(zookeeper, listOfOps, false); @@ -505,6 +516,13 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase return stat.getCversion(); } + /** + * The optimistic lock of this implement is based on the cversion of root {@link #queuesZNode}. + * Therefore, we must update the cversion of root {@link #queuesZNode} when migrate wal nodes to + * other queues. + * @see #claimQueue(ServerName, String, ServerName) as an example of updating root + * {@link #queuesZNode} cversion. + */ @Override public Set getAllWALs() throws ReplicationException { try { diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java index 5080f6cb609..a2ca0d96fa5 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java @@ -206,18 +206,29 @@ public class TestZKReplicationQueueStorage { } } - // For HBASE-12865 + // For HBASE-12865, HBASE-26482 @Test public void testClaimQueueChangeCversion() throws ReplicationException, KeeperException { ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000); STORAGE.addWAL(serverName1, "1", "file"); + STORAGE.addWAL(serverName1, "2", "file"); + + ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001); + // Avoid claimQueue update cversion for prepare server2 rsNode. + STORAGE.addWAL(serverName2, "1", "file"); + STORAGE.addWAL(serverName2, "2", "file"); int v0 = STORAGE.getQueuesZNodeCversion(); - ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001); + STORAGE.claimQueue(serverName1, "1", serverName2); int v1 = STORAGE.getQueuesZNodeCversion(); - // cversion should increase by 1 since a child node is deleted - assertEquals(1, v1 - v0); + // cversion should be increased by claimQueue method. + assertTrue(v1 > v0); + + STORAGE.claimQueue(serverName1, "2", serverName2); + int v2 = STORAGE.getQueuesZNodeCversion(); + // cversion should be increased by claimQueue method. + assertTrue(v2 > v1); } private ZKReplicationQueueStorage createWithUnstableVersion() throws IOException {