From 7a1d00c7a0719ccb8d97b42965835f124ef27804 Mon Sep 17 00:00:00 2001 From: huzheng Date: Wed, 21 Mar 2018 17:34:10 +0800 Subject: [PATCH] HBASE-20138 Find a way to deal with the conflicts when updating replication position --- .../ZKReplicationQueueStorage.java | 110 ++++++++++++------ .../TestReplicationStateBasic.java | 6 + .../TestZKReplicationQueueStorage.java | 48 +++++++- .../apache/hadoop/hbase/zookeeper/ZKUtil.java | 34 ++++-- .../hadoop/hbase/zookeeper/TestZKUtil.java | 25 ++++ 5 files changed, 178 insertions(+), 45 deletions(-) diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java index e5a498a3d6c..19986f17e68 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java @@ -204,20 +204,25 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase } private void addLastSeqIdsToOps(String queueId, Map lastSeqIds, - List listOfOps) throws KeeperException { + List listOfOps) throws KeeperException, ReplicationException { + String peerId = new ReplicationQueueInfo(queueId).getPeerId(); for (Entry lastSeqEntry : lastSeqIds.entrySet()) { - String peerId = new ReplicationQueueInfo(queueId).getPeerId(); String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); - /* - * Make sure the existence of path - * /hbase/replication/regions//-. As the javadoc in - * multiOrSequential() method said, if received a NodeExistsException, all operations will - * fail. So create the path here, and in fact, no need to add this operation to listOfOps, - * because only need to make sure that update file position and sequence id atomically. - */ - ZKUtil.createWithParents(zookeeper, path); - // Persist the max sequence id of region to zookeeper. - listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue()))); + Pair p = getLastSequenceIdWithVersion(lastSeqEntry.getKey(), peerId); + byte[] data = ZKUtil.positionToByteArray(lastSeqEntry.getValue()); + if (p.getSecond() < 0) { // ZNode does not exist. + ZKUtil.createWithParents(zookeeper, + path.substring(0, path.lastIndexOf(ZNodePaths.ZNODE_PATH_SEPARATOR))); + listOfOps.add(ZKUtilOp.createAndFailSilent(path, data)); + continue; + } + // Perform CAS in a specific version v0 (HBASE-20138) + int v0 = p.getSecond(); + long lastPushedSeqId = p.getFirst(); + if (lastSeqEntry.getValue() <= lastPushedSeqId) { + continue; + } + listOfOps.add(ZKUtilOp.setData(path, data, v0)); } } @@ -225,50 +230,85 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase public void setWALPosition(ServerName serverName, String queueId, String fileName, long position, Map lastSeqIds) throws ReplicationException { try { - List listOfOps = new ArrayList<>(); - if (position > 0) { - listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName), - ZKUtil.positionToByteArray(position))); + for (int retry = 0;; retry++) { + List listOfOps = new ArrayList<>(); + if (position > 0) { + listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName), + ZKUtil.positionToByteArray(position))); + } + // Persist the max sequence id(s) of regions for serial replication atomically. + addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps); + if (listOfOps.isEmpty()) { + return; + } + try { + ZKUtil.multiOrSequential(zookeeper, listOfOps, false); + return; + } catch (KeeperException.BadVersionException | KeeperException.NodeExistsException e) { + LOG.warn( + "Bad version(or node exist) when persist the last pushed sequence id to zookeeper storage, " + + "Retry = " + retry + ", serverName=" + serverName + ", queueId=" + queueId + + ", fileName=" + fileName); + } } - // Persist the max sequence id(s) of regions for serial replication atomically. - addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps); - ZKUtil.multiOrSequential(zookeeper, listOfOps, false); } catch (KeeperException e) { throw new ReplicationException("Failed to set log position (serverName=" + serverName + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e); } } - @Override - public long getLastSequenceId(String encodedRegionName, String peerId) - throws ReplicationException { - byte[] data; - try { - data = - ZKUtil.getData(zookeeper, getSerialReplicationRegionPeerNode(encodedRegionName, peerId)); - } catch (KeeperException | InterruptedException e) { - throw new ReplicationException("Failed to get the last sequence id(region=" - + encodedRegionName + ", peerId=" + peerId + ")"); + /** + * Return the {lastPushedSequenceId, ZNodeDataVersion} pair. if ZNodeDataVersion is -1, it means + * that the ZNode does not exist. + */ + @VisibleForTesting + protected Pair getLastSequenceIdWithVersion(String encodedRegionName, + String peerId) throws KeeperException { + Stat stat = new Stat(); + String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId); + byte[] data = ZKUtil.getDataNoWatch(zookeeper, path, stat); + if (data == null) { + // ZNode does not exist, so just return version -1 to indicate that no node exist. + return Pair.newPair(HConstants.NO_SEQNUM, -1); } try { - return ZKUtil.parseWALPositionFrom(data); + return Pair.newPair(ZKUtil.parseWALPositionFrom(data), stat.getVersion()); } catch (DeserializationException de) { LOG.warn("Failed to parse log position (region=" + encodedRegionName + ", peerId=" + peerId + "), data=" + Bytes.toStringBinary(data)); } - return HConstants.NO_SEQNUM; + return Pair.newPair(HConstants.NO_SEQNUM, stat.getVersion()); + } + + @Override + public long getLastSequenceId(String encodedRegionName, String peerId) + throws ReplicationException { + try { + return getLastSequenceIdWithVersion(encodedRegionName, peerId).getFirst(); + } catch (KeeperException e) { + throw new ReplicationException("Failed to get last pushed sequence id (encodedRegionName=" + + encodedRegionName + ", peerId=" + peerId + ")", e); + } } @Override public void setLastSequenceIds(String peerId, Map lastSeqIds) throws ReplicationException { try { + // No need CAS and retry here, because it'll call setLastSequenceIds() for disabled peers + // only, so no conflict happen. List listOfOps = new ArrayList<>(); - addLastSeqIdsToOps(peerId, lastSeqIds, listOfOps); - ZKUtil.multiOrSequential(zookeeper, listOfOps, false); + for (Entry lastSeqEntry : lastSeqIds.entrySet()) { + String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId); + ZKUtil.createWithParents(zookeeper, path); + listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue()))); + } + if (!listOfOps.isEmpty()) { + ZKUtil.multiOrSequential(zookeeper, listOfOps, true); + } } catch (KeeperException e) { - throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId + - ", lastSeqIds.size=" + lastSeqIds.size(), e); + throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId + + ", size of lastSeqIds=" + lastSeqIds.size(), e); } } diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java index 3ed41217ce6..437804c05a6 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java @@ -316,6 +316,12 @@ public abstract class TestReplicationStateBasic { } assertEquals(900L, rqs.getLastSequenceId(region0, queue1)); assertEquals(1000L, rqs.getLastSequenceId(region1, queue1)); + + // Try to decrease the last pushed id by setWALPosition method. + rqs.setWALPosition(serverName1, queue1, getFileName("file1", 0), 11 * 100, + ImmutableMap.of(region0, 899L, region1, 1001L)); + assertEquals(900L, rqs.getLastSequenceId(region0, queue1)); + assertEquals(1001L, rqs.getLastSequenceId(region1, queue1)); } protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception { diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java index ca86a05d456..5821271db9e 100644 --- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java +++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.List; import java.util.Set; import java.util.SortedSet; + import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseZKTestingUtility; @@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.zookeeper.KeeperException; import org.junit.After; import org.junit.AfterClass; @@ -44,6 +46,8 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + @Category({ ReplicationTests.class, MediumTests.class }) public class TestZKReplicationQueueStorage { @@ -215,10 +219,11 @@ public class TestZKReplicationQueueStorage { assertEquals(1, v1 - v0); } - private ZKReplicationQueueStorage createWithUnstableCversion() throws IOException { + private ZKReplicationQueueStorage createWithUnstableVersion() throws IOException { return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()) { private int called = 0; + private int getLastSeqIdOpIndex = 0; @Override protected int getQueuesZNodeCversion() throws KeeperException { @@ -227,12 +232,26 @@ public class TestZKReplicationQueueStorage { } return called; } + + @Override + protected Pair getLastSequenceIdWithVersion(String encodedRegionName, + String peerId) throws KeeperException { + Pair oldPair = super.getLastSequenceIdWithVersion(encodedRegionName, peerId); + if (getLastSeqIdOpIndex < 100) { + // Let the ZNode version increase. + String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId); + ZKUtil.createWithParents(zookeeper, path); + ZKUtil.setData(zookeeper, path, ZKUtil.positionToByteArray(100L)); + } + getLastSeqIdOpIndex++; + return oldPair; + } }; } @Test public void testGetAllWALsCversionChange() throws IOException, ReplicationException { - ZKReplicationQueueStorage storage = createWithUnstableCversion(); + ZKReplicationQueueStorage storage = createWithUnstableVersion(); storage.addWAL(getServerName(0), "1", "file"); // This should return eventually when cversion stabilizes Set allWals = storage.getAllWALs(); @@ -243,7 +262,7 @@ public class TestZKReplicationQueueStorage { // For HBASE-14621 @Test public void testGetAllHFileRefsCversionChange() throws IOException, ReplicationException { - ZKReplicationQueueStorage storage = createWithUnstableCversion(); + ZKReplicationQueueStorage storage = createWithUnstableVersion(); storage.addPeerToHFileRefs("1"); Path p = new Path("/test"); storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p))); @@ -253,6 +272,29 @@ public class TestZKReplicationQueueStorage { assertThat(allHFileRefs, hasItems("test")); } + // For HBASE-20138 + @Test + public void testSetWALPositionBadVersion() throws IOException, ReplicationException { + ZKReplicationQueueStorage storage = createWithUnstableVersion(); + ServerName serverName1 = ServerName.valueOf("128.0.0.1", 8000, 10000); + assertTrue(storage.getAllQueues(serverName1).isEmpty()); + String queue1 = "1"; + String fileName = getFileName("file1", 0); + String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc6"; + storage.addWAL(serverName1, queue1, fileName); + + List wals1 = storage.getWALsInQueue(serverName1, queue1); + assertEquals(1, wals1.size()); + + assertEquals(0, storage.getWALPosition(serverName1, queue1, fileName)); + // This should return eventually when data version stabilizes + storage.setWALPosition(serverName1, queue1, fileName, 100, + ImmutableMap.of(encodedRegionName, 120L)); + + assertEquals(100, storage.getWALPosition(serverName1, queue1, fileName)); + assertEquals(120L, storage.getLastSequenceId(encodedRegionName, queue1)); + } + @Test public void testRegionsZNodeLayout() throws Exception { String peerId = "1"; diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java index 7bb47527597..75ad0cb62b6 100644 --- a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java @@ -1500,10 +1500,17 @@ public final class ZKUtil { /** * @return a setData ZKUtilOp */ - public static ZKUtilOp setData(String path, byte [] data) { + public static ZKUtilOp setData(String path, byte[] data) { return new SetData(path, data); } + /** + * @return a setData ZKUtilOp + */ + public static ZKUtilOp setData(String path, byte[] data, int version) { + return new SetData(path, data, version); + } + /** * @return path to znode where the ZKOp will occur */ @@ -1578,17 +1585,28 @@ public final class ZKUtil { * ZKUtilOp representing setData in ZooKeeper */ public static final class SetData extends ZKUtilOp { - private byte [] data; + private byte[] data; + private int version = -1; - private SetData(String path, byte [] data) { + private SetData(String path, byte[] data) { super(path); this.data = data; } + private SetData(String path, byte[] data, int version) { + super(path); + this.data = data; + this.version = version; + } + public byte[] getData() { return data; } + public int getVersion() { + return version; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -1599,13 +1617,15 @@ public final class ZKUtil { } SetData op = (SetData) o; - return getPath().equals(op.getPath()) && Arrays.equals(data, op.data); + return getPath().equals(op.getPath()) && Arrays.equals(data, op.data) + && getVersion() == op.getVersion(); } @Override public int hashCode() { int ret = getPath().hashCode(); - return ret * 31 + Bytes.hashCode(data); + ret = ret * 31 + Bytes.hashCode(data); + return ret * 31 + Integer.hashCode(version); } } } @@ -1626,8 +1646,8 @@ public final class ZKUtil { DeleteNodeFailSilent dnfs = (DeleteNodeFailSilent)op; return Op.delete(dnfs.getPath(), -1); } else if (op instanceof SetData) { - SetData sd = (SetData)op; - return Op.setData(sd.getPath(), sd.getData(), -1); + SetData sd = (SetData) op; + return Op.setData(sd.getPath(), sd.getData(), sd.getVersion()); } else { throw new UnsupportedOperationException("Unexpected ZKUtilOp type: " + op.getClass().getName()); diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java index 6c3279a8182..1508441181c 100644 --- a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestZKUtil.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.zookeeper; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.ZKTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs; @@ -46,6 +48,7 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.io.Closeables; @Category({ ZKTests.class, MediumTests.class }) @@ -117,6 +120,28 @@ public class TestZKUtil { assertNull(ZKUtil.getDataNoWatch(ZKW, "/l1/l2", null)); } + private int getZNodeDataVersion(String znode) throws KeeperException { + Stat stat = new Stat(); + ZKUtil.getDataNoWatch(ZKW, znode, stat); + return stat.getVersion(); + } + + @Test + public void testSetDataWithVersion() throws Exception { + ZKUtil.createWithParents(ZKW, "/s1/s2/s3"); + int v0 = getZNodeDataVersion("/s1/s2/s3"); + assertEquals(0, v0); + + ZKUtil.setData(ZKW, "/s1/s2/s3", Bytes.toBytes(12L)); + int v1 = getZNodeDataVersion("/s1/s2/s3"); + assertEquals(1, v1); + + ZKUtil.multiOrSequential(ZKW, + ImmutableList.of(ZKUtilOp.setData("/s1/s2/s3", Bytes.toBytes(13L), v1)), false); + int v2 = getZNodeDataVersion("/s1/s2/s3"); + assertEquals(2, v2); + } + /** * A test for HBASE-3238 * @throws IOException A connection attempt to zk failed