HBASE-13217 Procedure fails due to ZK issue
This commit is contained in:
parent
6213fa2fce
commit
2b56164628
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.procedure;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
|
@ -39,10 +38,6 @@ import com.google.common.collect.MapMaker;
|
||||||
* Process to kick off and manage a running {@link Subprocedure} on a member. This is the
|
* Process to kick off and manage a running {@link Subprocedure} on a member. This is the
|
||||||
* specialized part of a {@link Procedure} that actually does procedure type-specific work
|
* specialized part of a {@link Procedure} that actually does procedure type-specific work
|
||||||
* and reports back to the coordinator as it completes each phase.
|
* and reports back to the coordinator as it completes each phase.
|
||||||
* <p>
|
|
||||||
* If there is a connection error ({@link #controllerConnectionFailure(String, IOException)}), all
|
|
||||||
* currently running subprocedures are notify to failed since there is no longer a way to reach any
|
|
||||||
* other members or coordinators since the rpcs are down.
|
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ProcedureMember implements Closeable {
|
public class ProcedureMember implements Closeable {
|
||||||
|
@ -213,16 +208,17 @@ public class ProcedureMember implements Closeable {
|
||||||
* other members since we cannot reach them anymore.
|
* other members since we cannot reach them anymore.
|
||||||
* @param message description of the error
|
* @param message description of the error
|
||||||
* @param cause the actual cause of the failure
|
* @param cause the actual cause of the failure
|
||||||
*
|
* @param procName the name of the procedure we'd cancel due to the error.
|
||||||
* TODO i'm tempted to just remove this code completely and treat it like any other abort.
|
|
||||||
* Implementation wise, if this happens it is a ZK failure which means the RS will abort.
|
|
||||||
*/
|
*/
|
||||||
public void controllerConnectionFailure(final String message, final IOException cause) {
|
public void controllerConnectionFailure(final String message, final Throwable cause,
|
||||||
Collection<Subprocedure> toNotify = subprocs.values();
|
final String procName) {
|
||||||
LOG.error(message, cause);
|
LOG.error(message, cause);
|
||||||
for (Subprocedure sub : toNotify) {
|
if (procName == null) {
|
||||||
// TODO notify the elements, if they aren't null
|
return;
|
||||||
sub.cancel(message, cause);
|
}
|
||||||
|
Subprocedure toNotify = subprocs.get(procName);
|
||||||
|
if (toNotify != null) {
|
||||||
|
toNotify.cancel(message, cause);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
|
||||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
|
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionListener;
|
||||||
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
|
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
|
||||||
import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
|
import org.apache.hadoop.hbase.errorhandling.TimeoutExceptionInjector;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Distributed procedure member's Subprocedure. A procedure is sarted on a ProcedureCoordinator
|
* Distributed procedure member's Subprocedure. A procedure is sarted on a ProcedureCoordinator
|
||||||
|
@ -106,8 +107,12 @@ abstract public class Subprocedure implements Callable<Void> {
|
||||||
LOG.debug("Was remote foreign exception, not redispatching error", ee);
|
LOG.debug("Was remote foreign exception, not redispatching error", ee);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
// if this is a local KeeperException, don't attempt to notify other members
|
||||||
// if it is local, then send it to the coordinator
|
if (ee.getCause() instanceof KeeperException) {
|
||||||
|
LOG.debug("Was KeeperException, not redispatching error", ee);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// if it is other local error, then send it to the coordinator
|
||||||
try {
|
try {
|
||||||
rpcs.sendMemberAborted(Subprocedure.this, ee);
|
rpcs.sendMemberAborted(Subprocedure.this, ee);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
|
|
@ -143,7 +143,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
}
|
}
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
member.controllerConnectionFailure("Failed to list children for abort node:"
|
member.controllerConnectionFailure("Failed to list children for abort node:"
|
||||||
+ zkController.getAbortZnode(), new IOException(e));
|
+ zkController.getAbortZnode(), e, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -160,7 +160,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
}
|
}
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
member.controllerConnectionFailure("General failure when watching for new procedures",
|
member.controllerConnectionFailure("General failure when watching for new procedures",
|
||||||
new IOException(e));
|
e, null);
|
||||||
}
|
}
|
||||||
if (runningProcedures == null) {
|
if (runningProcedures == null) {
|
||||||
LOG.debug("No running procedures.");
|
LOG.debug("No running procedures.");
|
||||||
|
@ -192,7 +192,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
}
|
}
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
member.controllerConnectionFailure("Failed to get the abort znode (" + abortZNode
|
member.controllerConnectionFailure("Failed to get the abort znode (" + abortZNode
|
||||||
+ ") for procedure :" + opName, new IOException(e));
|
+ ") for procedure :" + opName, e, opName);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -220,10 +220,10 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
sendMemberAborted(subproc, new ForeignException(getMemberName(), ise));
|
sendMemberAborted(subproc, new ForeignException(getMemberName(), ise));
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
|
member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
|
||||||
new IOException(e));
|
e, opName);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
|
member.controllerConnectionFailure("Failed to get data for new procedure:" + opName,
|
||||||
new IOException(e));
|
e, opName);
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -252,7 +252,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
}
|
}
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
member.controllerConnectionFailure("Failed to acquire barrier for procedure: "
|
member.controllerConnectionFailure("Failed to acquire barrier for procedure: "
|
||||||
+ procName + " and member: " + memberName, new IOException(e));
|
+ procName + " and member: " + memberName, e, procName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -274,7 +274,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
ProtobufUtil.prependPBMagic(data));
|
ProtobufUtil.prependPBMagic(data));
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
member.controllerConnectionFailure("Failed to post zk node:" + joinPath
|
member.controllerConnectionFailure("Failed to post zk node:" + joinPath
|
||||||
+ " to join procedure barrier.", new IOException(e));
|
+ " to join procedure barrier.", e, procName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -301,7 +301,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
// that case we should still get an error for that procedure anyways
|
// that case we should still get an error for that procedure anyways
|
||||||
zkController.logZKTree(zkController.getBaseZnode());
|
zkController.logZKTree(zkController.getBaseZnode());
|
||||||
member.controllerConnectionFailure("Failed to post zk node:" + procAbortZNode
|
member.controllerConnectionFailure("Failed to post zk node:" + procAbortZNode
|
||||||
+ " to abort procedure", new IOException(e));
|
+ " to abort procedure", e, procName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -338,7 +338,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
this.member.receiveAbortProcedure(opName, ee);
|
this.member.receiveAbortProcedure(opName, ee);
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
member.controllerConnectionFailure("Failed to get data for abort znode:" + abortZNode
|
member.controllerConnectionFailure("Failed to get data for abort znode:" + abortZNode
|
||||||
+ zkController.getAbortZnode(), new IOException(e));
|
+ zkController.getAbortZnode(), e, opName);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
LOG.warn("abort already in progress", e);
|
LOG.warn("abort already in progress", e);
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
|
|
Loading…
Reference in New Issue