diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java index ae3310fe4b8..adf94efaa46 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Procedure.java @@ -293,7 +293,7 @@ public class Procedure implements Callable, ForeignExceptionListener { * @param member */ public void barrierAcquiredByMember(String member) { - LOG.debug("member: '" + member + "' joining prepared barrier for procedure '" + procName + LOG.debug("member: '" + member + "' joining acquired barrier for procedure '" + procName + "' on coordinator"); if (this.acquiringMembers.contains(member)) { synchronized (joinBarrierLock) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java index dbeac4fb3ae..fdb454ba565 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureCoordinator.java @@ -164,14 +164,15 @@ public class ProcedureCoordinator { Future f = null; try { synchronized (procedures) { - f = this.pool.submit(proc); - // if everything got started properly, we can add it known running procedures this.procedures.put(procName, proc); + f = this.pool.submit(proc); } return true; } catch (RejectedExecutionException e) { LOG.warn("Procedure " + procName + " rejected by execution pool. Propagating error and " + "cancelling operation.", e); + // Remove the procedure from the list since is not started + this.procedures.remove(procName); // the thread pool is full and we can't run the procedure proc.receive(new ForeignException(procName, e)); @@ -258,9 +259,12 @@ public class ProcedureCoordinator { */ void memberAcquiredBarrier(String procName, final String member) { Procedure proc = procedures.get(procName); - if (proc != null) { - proc.barrierAcquiredByMember(member); + if (proc == null) { + LOG.warn("Member '"+ member +"' is trying to acquire an unknown procedure '"+ procName +"'"); + return; } + + proc.barrierAcquiredByMember(member); } /** @@ -271,9 +275,11 @@ public class ProcedureCoordinator { */ void memberFinishedBarrier(String procName, final String member) { Procedure proc = procedures.get(procName); - if (proc != null) { - proc.barrierReleasedByMember(member); + if (proc == null) { + LOG.warn("Member '"+ member +"' is trying to release an unknown procedure '"+ procName +"'"); + return; } + proc.barrierReleasedByMember(member); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java index 3f5ae1a53ac..56c6d5b4450 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureCoordinatorRpcs.java @@ -182,6 +182,8 @@ public class ZKProcedureCoordinatorRpcs implements ProcedureCoordinatorRpcs { ZKUtil.getNodeName(path)); } else if (isAbortPathNode(path)) { abort(path); + } else { + LOG.debug("Ignoring created notification for node:" + path); } } };