HBASE-13885 ZK watches leaks during snapshots.
This commit is contained in:
parent
a10a82a8ff
commit
ce2fd2c58c
|
@ -275,7 +275,10 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs {
|
||||||
ForeignException ee = null;
|
ForeignException ee = null;
|
||||||
try {
|
try {
|
||||||
byte[] data = ZKUtil.getData(zkProc.getWatcher(), abortNode);
|
byte[] data = ZKUtil.getData(zkProc.getWatcher(), abortNode);
|
||||||
if (!ProtobufUtil.isPBMagicPrefix(data)) {
|
if (data == null || data.length == 0) {
|
||||||
|
// ignore
|
||||||
|
return;
|
||||||
|
} else if (!ProtobufUtil.isPBMagicPrefix(data)) {
|
||||||
LOG.warn("Got an error notification for op:" + abortNode
|
LOG.warn("Got an error notification for op:" + abortNode
|
||||||
+ " but we can't read the information. Killing the procedure.");
|
+ " but we can't read the information. Killing the procedure.");
|
||||||
// we got a remote exception, but we can't describe it
|
// we got a remote exception, but we can't describe it
|
||||||
|
|
|
@ -318,7 +318,10 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
// figure out the data we need to pass
|
// figure out the data we need to pass
|
||||||
ForeignException ee;
|
ForeignException ee;
|
||||||
try {
|
try {
|
||||||
if (!ProtobufUtil.isPBMagicPrefix(data)) {
|
if (data == null || data.length == 0) {
|
||||||
|
// ignore
|
||||||
|
return;
|
||||||
|
} else if (!ProtobufUtil.isPBMagicPrefix(data)) {
|
||||||
String msg = "Illegally formatted data in abort node for proc " + opName
|
String msg = "Illegally formatted data in abort node for proc " + opName
|
||||||
+ ". Killing the procedure.";
|
+ ". Killing the procedure.";
|
||||||
LOG.error(msg);
|
LOG.error(msg);
|
||||||
|
|
|
@ -283,6 +283,11 @@ public abstract class ZKProcedureUtil
|
||||||
// TODO This is potentially racy since not atomic. update when we support zk that has multi
|
// TODO This is potentially racy since not atomic. update when we support zk that has multi
|
||||||
LOG.info("Clearing all znodes for procedure " + procedureName + "including nodes "
|
LOG.info("Clearing all znodes for procedure " + procedureName + "including nodes "
|
||||||
+ acquiredZnode + " " + reachedZnode + " " + abortZnode);
|
+ acquiredZnode + " " + reachedZnode + " " + abortZnode);
|
||||||
|
|
||||||
|
// Make sure we trigger the watches on these nodes by creating them. (HBASE-13885)
|
||||||
|
ZKUtil.createAndFailSilent(watcher, getAcquiredBarrierNode(procedureName));
|
||||||
|
ZKUtil.createAndFailSilent(watcher, getAbortZNode(procedureName));
|
||||||
|
|
||||||
ZKUtil.deleteNodeRecursively(watcher, getAcquiredBarrierNode(procedureName));
|
ZKUtil.deleteNodeRecursively(watcher, getAcquiredBarrierNode(procedureName));
|
||||||
ZKUtil.deleteNodeRecursively(watcher, getReachedBarrierNode(procedureName));
|
ZKUtil.deleteNodeRecursively(watcher, getReachedBarrierNode(procedureName));
|
||||||
ZKUtil.deleteNodeRecursively(watcher, getAbortZNode(procedureName));
|
ZKUtil.deleteNodeRecursively(watcher, getAbortZNode(procedureName));
|
||||||
|
|
Loading…
Reference in New Issue