diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
index 30b964f7531..1f220221d12 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ProcedureMember.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.procedure;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Collection;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
@@ -39,10 +38,6 @@ import com.google.common.collect.MapMaker;
* Process to kick off and manage a running {@link Subprocedure} on a member. This is the
* specialized part of a {@link Procedure} that actually does procedure type-specific work
* and reports back to the coordinator as it completes each phase.
- *
- * If there is a connection error ({@link #controllerConnectionFailure(String, IOException)}), all
- * currently running subprocedures are notify to failed since there is no longer a way to reach any
- * other members or coordinators since the rpcs are down.
*/
@InterfaceAudience.Private
public class ProcedureMember implements Closeable {
@@ -213,16 +208,17 @@ public class ProcedureMember implements Closeable {
* other members since we cannot reach them anymore.
* @param message description of the error
* @param cause the actual cause of the failure
- *
- * TODO i'm tempted to just remove this code completely and treat it like any other abort.
- * Implementation wise, if this happens it is a ZK failure which means the RS will abort.
+ * @param procName the name of the procedure we'd cancel due to the error.
*/
- public void controllerConnectionFailure(final String message, final IOException cause) {
- Collection toNotify = subprocs.values();
+ public void controllerConnectionFailure(final String message, final Throwable cause,
+ final String procName) {
LOG.error(message, cause);
- for (Subprocedure sub : toNotify) {
- // TODO notify the elements, if they aren't null
- sub.cancel(message, cause);
+ if (procName == null) {
+ return;
+ }
+ Subprocedure toNotify = subprocs.get(procName);
+ if (toNotify != null) {
+ toNotify.cancel(message, cause);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
index 9ef5d232d7a..892733828e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/Subprocedure.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
+import org.apache.zookeeper.KeeperException;
/**
* Distributed procedure member's Subprocedure. A procedure is sarted on a ProcedureCoordinator
@@ -106,8 +107,12 @@ abstract public class Subprocedure implements Callable {
LOG.debug("Was remote foreign exception, not redispatching error", ee);
return;
}
-
- // if it is local, then send it to the coordinator
+ // if this is a local KeeperException, don't attempt to notify other members
+ if (ee.getCause() instanceof KeeperException) {
+ LOG.debug("Was KeeperException, not redispatching error", ee);
+ return;
+ }
+ // if it is other local error, then send it to the coordinator
try {
rpcs.sendMemberAborted(Subprocedure.this, ee);
} catch (IOException e) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
index 387c2e45f45..6ac2a479189 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/ZKProcedureMemberRpcs.java
@@ -143,7 +143,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
}
} catch (KeeperException e) {
member.controllerConnectionFailure("Failed to list children for abort node:"
- + zkController.getAbortZnode(), new IOException(e));
+ + zkController.getAbortZnode(), e, null);
}
}
@@ -160,7 +160,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
}
} catch (KeeperException e) {
member.controllerConnectionFailure("General failure when watching for new procedures",
- new IOException(e));
+ e, null);
}
if (runningProcedures == null) {
LOG.debug("No running procedures.");
@@ -192,7 +192,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
}
} catch (KeeperException e) {
member.controllerConnectionFailure("Failed to get the abort znode (" + abortZNode
- + ") for procedure :" + opName, new IOException(e));
+ + ") for procedure :" + opName, e, opName);
return;
}
@@ -220,10 +220,10 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
sendMemberAborted(subproc, new ForeignException(getMemberName(), ise));
} catch (KeeperException e) {
member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
- new IOException(e));
+ e, opName);
} catch (InterruptedException e) {
member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
- new IOException(e));
+ e, opName);
Thread.currentThread().interrupt();
}
}
@@ -252,7 +252,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
}
} catch (KeeperException e) {
member.controllerConnectionFailure("Failed to acquire barrier for procedure: "
- + procName + " and member: " + memberName, new IOException(e));
+ + procName + " and member: " + memberName, e, procName);
}
}
@@ -274,7 +274,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
ProtobufUtil.prependPBMagic(data));
} catch (KeeperException e) {
member.controllerConnectionFailure("Failed to post zk node:" + joinPath
- + " to join procedure barrier.", new IOException(e));
+ + " to join procedure barrier.", e, procName);
}
}
@@ -301,7 +301,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
// that case we should still get an error for that procedure anyways
zkController.logZKTree(zkController.getBaseZnode());
member.controllerConnectionFailure("Failed to post zk node:" + procAbortZNode
- + " to abort procedure", new IOException(e));
+ + " to abort procedure", e, procName);
}
}
@@ -338,7 +338,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
this.member.receiveAbortProcedure(opName, ee);
} catch (KeeperException e) {
member.controllerConnectionFailure("Failed to get data for abort znode:" + abortZNode
- + zkController.getAbortZnode(), new IOException(e));
+ + zkController.getAbortZnode(), e, opName);
} catch (InterruptedException e) {
LOG.warn("abort already in progress", e);
Thread.currentThread().interrupt();