HBASE-7616 NPE in ZKProcedure.nodeCreated
git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-7290@1445855 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
bc37ac1553
commit
6a25400409
|
@ -39,7 +39,7 @@ import com.google.protobuf.InvalidProtocolBufferException;
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
|
public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
|
||||||
public static final Log LOG = LogFactory.getLog(ZKProcedureUtil.class);
|
public static final Log LOG = LogFactory.getLog(ZKProcedureCoordinatorRpcs.class);
|
||||||
private ZKProcedureUtil zkProc = null;
|
private ZKProcedureUtil zkProc = null;
|
||||||
protected ProcedureCoordinator coordinator = null; // if started this should be non-null
|
protected ProcedureCoordinator coordinator = null; // if started this should be non-null
|
||||||
|
|
||||||
|
@ -165,20 +165,20 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
|
||||||
this.zkProc = new ZKProcedureUtil(watcher, procedureType, coordName) {
|
this.zkProc = new ZKProcedureUtil(watcher, procedureType, coordName) {
|
||||||
@Override
|
@Override
|
||||||
public void nodeCreated(String path) {
|
public void nodeCreated(String path) {
|
||||||
if (!zkProc.isInProcedurePath(path)) return;
|
if (!isInProcedurePath(path)) return;
|
||||||
LOG.debug("Node created: " + path);
|
LOG.debug("Node created: " + path);
|
||||||
logZKTree(this.baseZNode);
|
logZKTree(this.baseZNode);
|
||||||
if (zkProc.isAcquiredPathNode(path)) {
|
if (isAcquiredPathNode(path)) {
|
||||||
// node wasn't present when we created the watch so zk event triggers acquire
|
// node wasn't present when we created the watch so zk event triggers acquire
|
||||||
listener.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)), ZKUtil.getNodeName(path));
|
listener.memberAcquiredBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)), ZKUtil.getNodeName(path));
|
||||||
}
|
}
|
||||||
if (zkProc.isReachedPathNode(path)) {
|
if (isReachedPathNode(path)) {
|
||||||
// node wasn't present when we created the watch so zk event triggers the finished barrier.
|
// node wasn't present when we created the watch so zk event triggers the finished barrier.
|
||||||
|
|
||||||
// TODO Nothing enforces that acquire and reached znodes from showing up in the wrong order.
|
// TODO Nothing enforces that acquire and reached znodes from showing up in the wrong order.
|
||||||
listener.memberFinishedBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)), ZKUtil.getNodeName(path));
|
listener.memberFinishedBarrier(ZKUtil.getNodeName(ZKUtil.getParent(path)), ZKUtil.getNodeName(path));
|
||||||
}
|
}
|
||||||
if (zkProc.isAbortPathNode(path)) {
|
if (isAbortPathNode(path)) {
|
||||||
abort(path);
|
abort(path);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -74,31 +74,33 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
this.zkController = new ZKProcedureUtil(watcher, procType, memberName) {
|
this.zkController = new ZKProcedureUtil(watcher, procType, memberName) {
|
||||||
@Override
|
@Override
|
||||||
public void nodeCreated(String path) {
|
public void nodeCreated(String path) {
|
||||||
if (path.startsWith(this.baseZNode)) {
|
if (!isInProcedurePath(path)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
LOG.info("Received created event:" + path);
|
LOG.info("Received created event:" + path);
|
||||||
// if it is a simple start/end/abort then we just rewatch the node
|
// if it is a simple start/end/abort then we just rewatch the node
|
||||||
if (path.equals(this.acquiredZnode)) {
|
if (isAcquiredNode(path)) {
|
||||||
waitForNewProcedures();
|
waitForNewProcedures();
|
||||||
return;
|
return;
|
||||||
} else if (path.equals(this.abortZnode)) {
|
} else if (isAbortNode(path)) {
|
||||||
watchForAbortedProcedures();
|
watchForAbortedProcedures();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
String parent = ZKUtil.getParent(path);
|
String parent = ZKUtil.getParent(path);
|
||||||
// if its the end barrier, the procedure can be completed
|
// if its the end barrier, the procedure can be completed
|
||||||
if (parent.equals(this.reachedZnode)) {
|
if (isReachedNode(parent)) {
|
||||||
receivedReachedGlobalBarrier(path);
|
receivedReachedGlobalBarrier(path);
|
||||||
return;
|
return;
|
||||||
} else if (parent.equals(this.abortZnode)) {
|
} else if (isAbortNode(parent)) {
|
||||||
abort(path);
|
abort(path);
|
||||||
return;
|
return;
|
||||||
} else if (parent.equals(this.acquiredZnode)) {
|
} else if (isAcquiredNode(parent)) {
|
||||||
startNewSubprocedure(path);
|
startNewSubprocedure(path);
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Ignoring created notification for node:" + path);
|
LOG.debug("Ignoring created notification for node:" + path);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void nodeChildrenChanged(String path) {
|
public void nodeChildrenChanged(String path) {
|
||||||
|
|
|
@ -179,24 +179,47 @@ public abstract class ZKProcedureUtil
|
||||||
*
|
*
|
||||||
* @return true if starts with baseZnode
|
* @return true if starts with baseZnode
|
||||||
*/
|
*/
|
||||||
public boolean isInProcedurePath(String path) {
|
boolean isInProcedurePath(String path) {
|
||||||
return path.startsWith(baseZNode);
|
return path.startsWith(baseZNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is this the exact procedure barrier acquired znode
|
||||||
|
*/
|
||||||
|
boolean isAcquiredNode(String path) {
|
||||||
|
return path.equals(acquiredZnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Is this in the procedure barrier acquired znode path
|
* Is this in the procedure barrier acquired znode path
|
||||||
*/
|
*/
|
||||||
public boolean isAcquiredPathNode(String path) {
|
boolean isAcquiredPathNode(String path) {
|
||||||
return path.startsWith(this.acquiredZnode) && !path.equals(acquiredZnode);
|
return path.startsWith(this.acquiredZnode) && !path.equals(acquiredZnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is this the exact procedure barrier reached znode
|
||||||
|
*/
|
||||||
|
boolean isReachedNode(String path) {
|
||||||
|
return path.equals(reachedZnode);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Is this in the procedure barrier reached znode path
|
* Is this in the procedure barrier reached znode path
|
||||||
*/
|
*/
|
||||||
public boolean isReachedPathNode(String path) {
|
boolean isReachedPathNode(String path) {
|
||||||
return path.startsWith(this.reachedZnode) && !path.equals(reachedZnode);
|
return path.startsWith(this.reachedZnode) && !path.equals(reachedZnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is this in the procedure barrier abort znode path
|
||||||
|
*/
|
||||||
|
boolean isAbortNode(String path) {
|
||||||
|
return path.equals(abortZnode);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Is this in the procedure barrier abort znode path
|
* Is this in the procedure barrier abort znode path
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue