HBASE-3136 Stale reads from ZK can break the atomic CAS operations we have in ZKAssign
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1026909 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1d6743ae7f
commit
a574082d85
|
@ -610,6 +610,8 @@ Release 0.21.0 - Unreleased
|
||||||
attributes
|
attributes
|
||||||
HBASE-3143 Adding the tests' hbase-site.xml to the jar breaks some clients
|
HBASE-3143 Adding the tests' hbase-site.xml to the jar breaks some clients
|
||||||
HBASE-3139 Server shutdown processor stuck because meta not online
|
HBASE-3139 Server shutdown processor stuck because meta not online
|
||||||
|
HBASE-3136 Stale reads from ZK can break the atomic CAS operations we
|
||||||
|
have in ZKAssign
|
||||||
|
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
|
@ -239,6 +239,7 @@ public class ZKAssign {
|
||||||
EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName);
|
EventType.M_ZK_REGION_OFFLINE, region.getRegionName(), serverName);
|
||||||
synchronized(zkw.getNodes()) {
|
synchronized(zkw.getNodes()) {
|
||||||
String node = getNodeName(zkw, region.getEncodedName());
|
String node = getNodeName(zkw, region.getEncodedName());
|
||||||
|
zkw.sync(node);
|
||||||
zkw.getNodes().add(node);
|
zkw.getNodes().add(node);
|
||||||
int version = ZKUtil.checkExists(zkw, node);
|
int version = ZKUtil.checkExists(zkw, node);
|
||||||
if(version == -1) {
|
if(version == -1) {
|
||||||
|
@ -380,6 +381,7 @@ public class ZKAssign {
|
||||||
LOG.debug(zkw.prefix("Deleting existing unassigned " +
|
LOG.debug(zkw.prefix("Deleting existing unassigned " +
|
||||||
"node for " + regionName + " that is in expected state " + expectedState));
|
"node for " + regionName + " that is in expected state " + expectedState));
|
||||||
String node = getNodeName(zkw, regionName);
|
String node = getNodeName(zkw, regionName);
|
||||||
|
zkw.sync(node);
|
||||||
Stat stat = new Stat();
|
Stat stat = new Stat();
|
||||||
byte [] bytes = ZKUtil.getDataNoWatch(zkw, node, stat);
|
byte [] bytes = ZKUtil.getDataNoWatch(zkw, node, stat);
|
||||||
if(bytes == null) {
|
if(bytes == null) {
|
||||||
|
@ -645,6 +647,7 @@ public class ZKAssign {
|
||||||
}
|
}
|
||||||
|
|
||||||
String node = getNodeName(zkw, encoded);
|
String node = getNodeName(zkw, encoded);
|
||||||
|
zkw.sync(node);
|
||||||
|
|
||||||
// Read existing data of the node
|
// Read existing data of the node
|
||||||
Stat stat = new Stat();
|
Stat stat = new Stat();
|
||||||
|
|
|
@ -317,6 +317,22 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Forces a synchronization of this ZooKeeper client connection.
|
||||||
|
* <p>
|
||||||
|
* Executing this method before running other methods will ensure that the
|
||||||
|
* subsequent operations are up-to-date and consistent as of the time that
|
||||||
|
* the sync is complete.
|
||||||
|
* <p>
|
||||||
|
* This is used for compareAndSwap type operations where we need to read the
|
||||||
|
* data of an existing node and delete or transition that node, utilizing the
|
||||||
|
* previously read version and data. We want to ensure that the version read
|
||||||
|
* is up-to-date from when we begin the operation.
|
||||||
|
*/
|
||||||
|
public void sync(String path) {
|
||||||
|
this.zooKeeper.sync(path, null, null);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the set of already watched unassigned nodes.
|
* Get the set of already watched unassigned nodes.
|
||||||
* @return
|
* @return
|
||||||
|
|
Loading…
Reference in New Issue