HBASE-20138 Find a way to deal with the conflicts when updating replication position
This commit is contained in:
parent
e9701a0595
commit
7a1d00c7a0
|
@ -204,20 +204,25 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
|
||||||
}
|
}
|
||||||
|
|
||||||
private void addLastSeqIdsToOps(String queueId, Map<String, Long> lastSeqIds,
|
private void addLastSeqIdsToOps(String queueId, Map<String, Long> lastSeqIds,
|
||||||
List<ZKUtilOp> listOfOps) throws KeeperException {
|
List<ZKUtilOp> listOfOps) throws KeeperException, ReplicationException {
|
||||||
|
String peerId = new ReplicationQueueInfo(queueId).getPeerId();
|
||||||
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
|
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
|
||||||
String peerId = new ReplicationQueueInfo(queueId).getPeerId();
|
|
||||||
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
|
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
|
||||||
/*
|
Pair<Long, Integer> p = getLastSequenceIdWithVersion(lastSeqEntry.getKey(), peerId);
|
||||||
* Make sure the existence of path
|
byte[] data = ZKUtil.positionToByteArray(lastSeqEntry.getValue());
|
||||||
* /hbase/replication/regions/<hash>/<encoded-region-name>-<peer-id>. As the javadoc in
|
if (p.getSecond() < 0) { // ZNode does not exist.
|
||||||
* multiOrSequential() method said, if received a NodeExistsException, all operations will
|
ZKUtil.createWithParents(zookeeper,
|
||||||
* fail. So create the path here, and in fact, no need to add this operation to listOfOps,
|
path.substring(0, path.lastIndexOf(ZNodePaths.ZNODE_PATH_SEPARATOR)));
|
||||||
* because only need to make sure that update file position and sequence id atomically.
|
listOfOps.add(ZKUtilOp.createAndFailSilent(path, data));
|
||||||
*/
|
continue;
|
||||||
ZKUtil.createWithParents(zookeeper, path);
|
}
|
||||||
// Persist the max sequence id of region to zookeeper.
|
// Perform CAS in a specific version v0 (HBASE-20138)
|
||||||
listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
|
int v0 = p.getSecond();
|
||||||
|
long lastPushedSeqId = p.getFirst();
|
||||||
|
if (lastSeqEntry.getValue() <= lastPushedSeqId) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
listOfOps.add(ZKUtilOp.setData(path, data, v0));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -225,50 +230,85 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
|
||||||
public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
|
public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
|
||||||
Map<String, Long> lastSeqIds) throws ReplicationException {
|
Map<String, Long> lastSeqIds) throws ReplicationException {
|
||||||
try {
|
try {
|
||||||
List<ZKUtilOp> listOfOps = new ArrayList<>();
|
for (int retry = 0;; retry++) {
|
||||||
if (position > 0) {
|
List<ZKUtilOp> listOfOps = new ArrayList<>();
|
||||||
listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
|
if (position > 0) {
|
||||||
ZKUtil.positionToByteArray(position)));
|
listOfOps.add(ZKUtilOp.setData(getFileNode(serverName, queueId, fileName),
|
||||||
|
ZKUtil.positionToByteArray(position)));
|
||||||
|
}
|
||||||
|
// Persist the max sequence id(s) of regions for serial replication atomically.
|
||||||
|
addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps);
|
||||||
|
if (listOfOps.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
|
||||||
|
return;
|
||||||
|
} catch (KeeperException.BadVersionException | KeeperException.NodeExistsException e) {
|
||||||
|
LOG.warn(
|
||||||
|
"Bad version(or node exist) when persist the last pushed sequence id to zookeeper storage, "
|
||||||
|
+ "Retry = " + retry + ", serverName=" + serverName + ", queueId=" + queueId
|
||||||
|
+ ", fileName=" + fileName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Persist the max sequence id(s) of regions for serial replication atomically.
|
|
||||||
addLastSeqIdsToOps(queueId, lastSeqIds, listOfOps);
|
|
||||||
ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
|
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
throw new ReplicationException("Failed to set log position (serverName=" + serverName
|
throw new ReplicationException("Failed to set log position (serverName=" + serverName
|
||||||
+ ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
|
+ ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position + ")", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
/**
|
||||||
public long getLastSequenceId(String encodedRegionName, String peerId)
|
* Return the {lastPushedSequenceId, ZNodeDataVersion} pair. if ZNodeDataVersion is -1, it means
|
||||||
throws ReplicationException {
|
* that the ZNode does not exist.
|
||||||
byte[] data;
|
*/
|
||||||
try {
|
@VisibleForTesting
|
||||||
data =
|
protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName,
|
||||||
ZKUtil.getData(zookeeper, getSerialReplicationRegionPeerNode(encodedRegionName, peerId));
|
String peerId) throws KeeperException {
|
||||||
} catch (KeeperException | InterruptedException e) {
|
Stat stat = new Stat();
|
||||||
throw new ReplicationException("Failed to get the last sequence id(region="
|
String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
|
||||||
+ encodedRegionName + ", peerId=" + peerId + ")");
|
byte[] data = ZKUtil.getDataNoWatch(zookeeper, path, stat);
|
||||||
|
if (data == null) {
|
||||||
|
// ZNode does not exist, so just return version -1 to indicate that no node exist.
|
||||||
|
return Pair.newPair(HConstants.NO_SEQNUM, -1);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
return ZKUtil.parseWALPositionFrom(data);
|
return Pair.newPair(ZKUtil.parseWALPositionFrom(data), stat.getVersion());
|
||||||
} catch (DeserializationException de) {
|
} catch (DeserializationException de) {
|
||||||
LOG.warn("Failed to parse log position (region=" + encodedRegionName + ", peerId=" + peerId
|
LOG.warn("Failed to parse log position (region=" + encodedRegionName + ", peerId=" + peerId
|
||||||
+ "), data=" + Bytes.toStringBinary(data));
|
+ "), data=" + Bytes.toStringBinary(data));
|
||||||
}
|
}
|
||||||
return HConstants.NO_SEQNUM;
|
return Pair.newPair(HConstants.NO_SEQNUM, stat.getVersion());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getLastSequenceId(String encodedRegionName, String peerId)
|
||||||
|
throws ReplicationException {
|
||||||
|
try {
|
||||||
|
return getLastSequenceIdWithVersion(encodedRegionName, peerId).getFirst();
|
||||||
|
} catch (KeeperException e) {
|
||||||
|
throw new ReplicationException("Failed to get last pushed sequence id (encodedRegionName="
|
||||||
|
+ encodedRegionName + ", peerId=" + peerId + ")", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
|
public void setLastSequenceIds(String peerId, Map<String, Long> lastSeqIds)
|
||||||
throws ReplicationException {
|
throws ReplicationException {
|
||||||
try {
|
try {
|
||||||
|
// No need CAS and retry here, because it'll call setLastSequenceIds() for disabled peers
|
||||||
|
// only, so no conflict happen.
|
||||||
List<ZKUtilOp> listOfOps = new ArrayList<>();
|
List<ZKUtilOp> listOfOps = new ArrayList<>();
|
||||||
addLastSeqIdsToOps(peerId, lastSeqIds, listOfOps);
|
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
|
||||||
ZKUtil.multiOrSequential(zookeeper, listOfOps, false);
|
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
|
||||||
|
ZKUtil.createWithParents(zookeeper, path);
|
||||||
|
listOfOps.add(ZKUtilOp.setData(path, ZKUtil.positionToByteArray(lastSeqEntry.getValue())));
|
||||||
|
}
|
||||||
|
if (!listOfOps.isEmpty()) {
|
||||||
|
ZKUtil.multiOrSequential(zookeeper, listOfOps, true);
|
||||||
|
}
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId +
|
throw new ReplicationException("Failed to set last sequence ids, peerId=" + peerId
|
||||||
", lastSeqIds.size=" + lastSeqIds.size(), e);
|
+ ", size of lastSeqIds=" + lastSeqIds.size(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -316,6 +316,12 @@ public abstract class TestReplicationStateBasic {
|
||||||
}
|
}
|
||||||
assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
|
assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
|
||||||
assertEquals(1000L, rqs.getLastSequenceId(region1, queue1));
|
assertEquals(1000L, rqs.getLastSequenceId(region1, queue1));
|
||||||
|
|
||||||
|
// Try to decrease the last pushed id by setWALPosition method.
|
||||||
|
rqs.setWALPosition(serverName1, queue1, getFileName("file1", 0), 11 * 100,
|
||||||
|
ImmutableMap.of(region0, 899L, region1, 1001L));
|
||||||
|
assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
|
||||||
|
assertEquals(1001L, rqs.getLastSequenceId(region1, queue1));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
|
protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.SortedSet;
|
import java.util.SortedSet;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
|
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
|
||||||
|
@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
|
@ -44,6 +46,8 @@ import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
||||||
|
|
||||||
@Category({ ReplicationTests.class, MediumTests.class })
|
@Category({ ReplicationTests.class, MediumTests.class })
|
||||||
public class TestZKReplicationQueueStorage {
|
public class TestZKReplicationQueueStorage {
|
||||||
|
|
||||||
|
@ -215,10 +219,11 @@ public class TestZKReplicationQueueStorage {
|
||||||
assertEquals(1, v1 - v0);
|
assertEquals(1, v1 - v0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ZKReplicationQueueStorage createWithUnstableCversion() throws IOException {
|
private ZKReplicationQueueStorage createWithUnstableVersion() throws IOException {
|
||||||
return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()) {
|
return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()) {
|
||||||
|
|
||||||
private int called = 0;
|
private int called = 0;
|
||||||
|
private int getLastSeqIdOpIndex = 0;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected int getQueuesZNodeCversion() throws KeeperException {
|
protected int getQueuesZNodeCversion() throws KeeperException {
|
||||||
|
@ -227,12 +232,26 @@ public class TestZKReplicationQueueStorage {
|
||||||
}
|
}
|
||||||
return called;
|
return called;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName,
|
||||||
|
String peerId) throws KeeperException {
|
||||||
|
Pair<Long, Integer> oldPair = super.getLastSequenceIdWithVersion(encodedRegionName, peerId);
|
||||||
|
if (getLastSeqIdOpIndex < 100) {
|
||||||
|
// Let the ZNode version increase.
|
||||||
|
String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
|
||||||
|
ZKUtil.createWithParents(zookeeper, path);
|
||||||
|
ZKUtil.setData(zookeeper, path, ZKUtil.positionToByteArray(100L));
|
||||||
|
}
|
||||||
|
getLastSeqIdOpIndex++;
|
||||||
|
return oldPair;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetAllWALsCversionChange() throws IOException, ReplicationException {
|
public void testGetAllWALsCversionChange() throws IOException, ReplicationException {
|
||||||
ZKReplicationQueueStorage storage = createWithUnstableCversion();
|
ZKReplicationQueueStorage storage = createWithUnstableVersion();
|
||||||
storage.addWAL(getServerName(0), "1", "file");
|
storage.addWAL(getServerName(0), "1", "file");
|
||||||
// This should return eventually when cversion stabilizes
|
// This should return eventually when cversion stabilizes
|
||||||
Set<String> allWals = storage.getAllWALs();
|
Set<String> allWals = storage.getAllWALs();
|
||||||
|
@ -243,7 +262,7 @@ public class TestZKReplicationQueueStorage {
|
||||||
// For HBASE-14621
|
// For HBASE-14621
|
||||||
@Test
|
@Test
|
||||||
public void testGetAllHFileRefsCversionChange() throws IOException, ReplicationException {
|
public void testGetAllHFileRefsCversionChange() throws IOException, ReplicationException {
|
||||||
ZKReplicationQueueStorage storage = createWithUnstableCversion();
|
ZKReplicationQueueStorage storage = createWithUnstableVersion();
|
||||||
storage.addPeerToHFileRefs("1");
|
storage.addPeerToHFileRefs("1");
|
||||||
Path p = new Path("/test");
|
Path p = new Path("/test");
|
||||||
storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p)));
|
storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p)));
|
||||||
|
@ -253,6 +272,29 @@ public class TestZKReplicationQueueStorage {
|
||||||
assertThat(allHFileRefs, hasItems("test"));
|
assertThat(allHFileRefs, hasItems("test"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For HBASE-20138
|
||||||
|
@Test
|
||||||
|
public void testSetWALPositionBadVersion() throws IOException, ReplicationException {
|
||||||
|
ZKReplicationQueueStorage storage = createWithUnstableVersion();
|
||||||
|
ServerName serverName1 = ServerName.valueOf("128.0.0.1", 8000, 10000);
|
||||||
|
assertTrue(storage.getAllQueues(serverName1).isEmpty());
|
||||||
|
String queue1 = "1";
|
||||||
|
String fileName = getFileName("file1", 0);
|
||||||
|
String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc6";
|
||||||
|
storage.addWAL(serverName1, queue1, fileName);
|
||||||
|
|
||||||
|
List<String> wals1 = storage.getWALsInQueue(serverName1, queue1);
|
||||||
|
assertEquals(1, wals1.size());
|
||||||
|
|
||||||
|
assertEquals(0, storage.getWALPosition(serverName1, queue1, fileName));
|
||||||
|
// This should return eventually when data version stabilizes
|
||||||
|
storage.setWALPosition(serverName1, queue1, fileName, 100,
|
||||||
|
ImmutableMap.of(encodedRegionName, 120L));
|
||||||
|
|
||||||
|
assertEquals(100, storage.getWALPosition(serverName1, queue1, fileName));
|
||||||
|
assertEquals(120L, storage.getLastSequenceId(encodedRegionName, queue1));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRegionsZNodeLayout() throws Exception {
|
public void testRegionsZNodeLayout() throws Exception {
|
||||||
String peerId = "1";
|
String peerId = "1";
|
||||||
|
|
|
@ -1500,10 +1500,17 @@ public final class ZKUtil {
|
||||||
/**
|
/**
|
||||||
* @return a setData ZKUtilOp
|
* @return a setData ZKUtilOp
|
||||||
*/
|
*/
|
||||||
public static ZKUtilOp setData(String path, byte [] data) {
|
public static ZKUtilOp setData(String path, byte[] data) {
|
||||||
return new SetData(path, data);
|
return new SetData(path, data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return a setData ZKUtilOp
|
||||||
|
*/
|
||||||
|
public static ZKUtilOp setData(String path, byte[] data, int version) {
|
||||||
|
return new SetData(path, data, version);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return path to znode where the ZKOp will occur
|
* @return path to znode where the ZKOp will occur
|
||||||
*/
|
*/
|
||||||
|
@ -1578,17 +1585,28 @@ public final class ZKUtil {
|
||||||
* ZKUtilOp representing setData in ZooKeeper
|
* ZKUtilOp representing setData in ZooKeeper
|
||||||
*/
|
*/
|
||||||
public static final class SetData extends ZKUtilOp {
|
public static final class SetData extends ZKUtilOp {
|
||||||
private byte [] data;
|
private byte[] data;
|
||||||
|
private int version = -1;
|
||||||
|
|
||||||
private SetData(String path, byte [] data) {
|
private SetData(String path, byte[] data) {
|
||||||
super(path);
|
super(path);
|
||||||
this.data = data;
|
this.data = data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private SetData(String path, byte[] data, int version) {
|
||||||
|
super(path);
|
||||||
|
this.data = data;
|
||||||
|
this.version = version;
|
||||||
|
}
|
||||||
|
|
||||||
public byte[] getData() {
|
public byte[] getData() {
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getVersion() {
|
||||||
|
return version;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean equals(Object o) {
|
public boolean equals(Object o) {
|
||||||
if (this == o) {
|
if (this == o) {
|
||||||
|
@ -1599,13 +1617,15 @@ public final class ZKUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
SetData op = (SetData) o;
|
SetData op = (SetData) o;
|
||||||
return getPath().equals(op.getPath()) && Arrays.equals(data, op.data);
|
return getPath().equals(op.getPath()) && Arrays.equals(data, op.data)
|
||||||
|
&& getVersion() == op.getVersion();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
int ret = getPath().hashCode();
|
int ret = getPath().hashCode();
|
||||||
return ret * 31 + Bytes.hashCode(data);
|
ret = ret * 31 + Bytes.hashCode(data);
|
||||||
|
return ret * 31 + Integer.hashCode(version);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1626,8 +1646,8 @@ public final class ZKUtil {
|
||||||
DeleteNodeFailSilent dnfs = (DeleteNodeFailSilent)op;
|
DeleteNodeFailSilent dnfs = (DeleteNodeFailSilent)op;
|
||||||
return Op.delete(dnfs.getPath(), -1);
|
return Op.delete(dnfs.getPath(), -1);
|
||||||
} else if (op instanceof SetData) {
|
} else if (op instanceof SetData) {
|
||||||
SetData sd = (SetData)op;
|
SetData sd = (SetData) op;
|
||||||
return Op.setData(sd.getPath(), sd.getData(), -1);
|
return Op.setData(sd.getPath(), sd.getData(), sd.getVersion());
|
||||||
} else {
|
} else {
|
||||||
throw new UnsupportedOperationException("Unexpected ZKUtilOp type: "
|
throw new UnsupportedOperationException("Unexpected ZKUtilOp type: "
|
||||||
+ op.getClass().getName());
|
+ op.getClass().getName());
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.zookeeper;
|
package org.apache.hadoop.hbase.zookeeper;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.ZKTests;
|
import org.apache.hadoop.hbase.testclassification.ZKTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
|
||||||
import org.apache.zookeeper.CreateMode;
|
import org.apache.zookeeper.CreateMode;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.apache.zookeeper.ZooDefs;
|
import org.apache.zookeeper.ZooDefs;
|
||||||
|
@ -46,6 +48,7 @@ import org.junit.experimental.categories.Category;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||||
|
|
||||||
@Category({ ZKTests.class, MediumTests.class })
|
@Category({ ZKTests.class, MediumTests.class })
|
||||||
|
@ -117,6 +120,28 @@ public class TestZKUtil {
|
||||||
assertNull(ZKUtil.getDataNoWatch(ZKW, "/l1/l2", null));
|
assertNull(ZKUtil.getDataNoWatch(ZKW, "/l1/l2", null));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int getZNodeDataVersion(String znode) throws KeeperException {
|
||||||
|
Stat stat = new Stat();
|
||||||
|
ZKUtil.getDataNoWatch(ZKW, znode, stat);
|
||||||
|
return stat.getVersion();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSetDataWithVersion() throws Exception {
|
||||||
|
ZKUtil.createWithParents(ZKW, "/s1/s2/s3");
|
||||||
|
int v0 = getZNodeDataVersion("/s1/s2/s3");
|
||||||
|
assertEquals(0, v0);
|
||||||
|
|
||||||
|
ZKUtil.setData(ZKW, "/s1/s2/s3", Bytes.toBytes(12L));
|
||||||
|
int v1 = getZNodeDataVersion("/s1/s2/s3");
|
||||||
|
assertEquals(1, v1);
|
||||||
|
|
||||||
|
ZKUtil.multiOrSequential(ZKW,
|
||||||
|
ImmutableList.of(ZKUtilOp.setData("/s1/s2/s3", Bytes.toBytes(13L), v1)), false);
|
||||||
|
int v2 = getZNodeDataVersion("/s1/s2/s3");
|
||||||
|
assertEquals(2, v2);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A test for HBASE-3238
|
* A test for HBASE-3238
|
||||||
* @throws IOException A connection attempt to zk failed
|
* @throws IOException A connection attempt to zk failed
|
||||||
|
|
Loading…
Reference in New Issue