HBASE-7523 Snapshot attempt with the name of a previously taken snapshot fails sometimes
git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-7290@1445837 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
232fa82451
commit
43ddbac484
|
@ -74,7 +74,7 @@ public class EnabledTableSnapshotHandler extends TakeSnapshotHandler {
|
||||||
Procedure proc = coordinator.startProcedure(this.monitor, this.snapshot.getName(),
|
Procedure proc = coordinator.startProcedure(this.monitor, this.snapshot.getName(),
|
||||||
this.snapshot.toByteArray(), Lists.newArrayList(regionServers));
|
this.snapshot.toByteArray(), Lists.newArrayList(regionServers));
|
||||||
if (proc == null) {
|
if (proc == null) {
|
||||||
String msg = "Failed to submit distribute procedure for snapshot '"
|
String msg = "Failed to submit distributed procedure for snapshot '"
|
||||||
+ snapshot.getName() + "'";
|
+ snapshot.getName() + "'";
|
||||||
LOG.error(msg);
|
LOG.error(msg);
|
||||||
throw new HBaseSnapshotException(msg);
|
throw new HBaseSnapshotException(msg);
|
||||||
|
|
|
@ -310,7 +310,8 @@ public class Procedure implements Callable<Void>, ForeignExceptionListener {
|
||||||
}
|
}
|
||||||
if (removed) {
|
if (removed) {
|
||||||
LOG.debug("Member: '" + member + "' released barrier for procedure'" + procName
|
LOG.debug("Member: '" + member + "' released barrier for procedure'" + procName
|
||||||
+ "', counting down latch");
|
+ "', counting down latch. Waiting for " + releasedBarrierLatch.getCount()
|
||||||
|
+ " more");
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName
|
LOG.warn("Member: '" + member + "' released barrier for procedure'" + procName
|
||||||
+ "', but we weren't waiting on it to release!");
|
+ "', but we weren't waiting on it to release!");
|
||||||
|
|
|
@ -113,8 +113,15 @@ public class ProcedureCoordinator {
|
||||||
|
|
||||||
// make sure we aren't already running an procedure of that name
|
// make sure we aren't already running an procedure of that name
|
||||||
synchronized (procedures) {
|
synchronized (procedures) {
|
||||||
if (procedures.get(procName) != null) {
|
Procedure oldProc = procedures.get(procName);
|
||||||
return false;
|
if (oldProc != null) {
|
||||||
|
// procedures are always eventually completed on both successful and failed execution
|
||||||
|
if (oldProc.completedLatch.getCount() != 0) {
|
||||||
|
LOG.warn("Procedure " + procName + " currently running. Rejecting new request");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
LOG.debug("Procedure " + procName + " was in running list but was completed. Accepting new attempt.");
|
||||||
|
procedures.remove(procName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,6 +135,8 @@ public class ProcedureCoordinator {
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
} catch (RejectedExecutionException e) {
|
} catch (RejectedExecutionException e) {
|
||||||
|
LOG.warn("Procedure " + procName + " rejected by execution pool. Propagating error and " +
|
||||||
|
"cancelling operation.", e);
|
||||||
// the thread pool is full and we can't run the procedure
|
// the thread pool is full and we can't run the procedure
|
||||||
proc.receive(new ForeignException(procName, e));
|
proc.receive(new ForeignException(procName, e));
|
||||||
|
|
||||||
|
|
|
@ -185,7 +185,6 @@ abstract public class Subprocedure implements Callable<Void> {
|
||||||
|
|
||||||
// make sure we didn't get an external exception
|
// make sure we didn't get an external exception
|
||||||
rethrowException();
|
rethrowException();
|
||||||
LOG.debug("Subprocedure '" + barrierName + "' locally completed");
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
String msg = null;
|
String msg = null;
|
||||||
if (e instanceof InterruptedException) {
|
if (e instanceof InterruptedException) {
|
||||||
|
|
|
@ -87,7 +87,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
String parent = ZKUtil.getParent(path);
|
String parent = ZKUtil.getParent(path);
|
||||||
// if its the end barrier, the procedure can be completed
|
// if its the end barrier, the procedure can be completed
|
||||||
if (parent.equals(this.reachedZnode)) {
|
if (parent.equals(this.reachedZnode)) {
|
||||||
recievedReachedGlobalBarrier(path);
|
receivedReachedGlobalBarrier(path);
|
||||||
return;
|
return;
|
||||||
} else if (parent.equals(this.abortZnode)) {
|
} else if (parent.equals(this.abortZnode)) {
|
||||||
abort(path);
|
abort(path);
|
||||||
|
@ -104,10 +104,10 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
public void nodeChildrenChanged(String path) {
|
public void nodeChildrenChanged(String path) {
|
||||||
LOG.info("Received children changed event:" + path);
|
LOG.info("Received children changed event:" + path);
|
||||||
if (path.equals(this.acquiredZnode)) {
|
if (path.equals(this.acquiredZnode)) {
|
||||||
LOG.info("Recieved start event.");
|
LOG.info("Received start event.");
|
||||||
waitForNewProcedures();
|
waitForNewProcedures();
|
||||||
} else if (path.equals(this.abortZnode)) {
|
} else if (path.equals(this.abortZnode)) {
|
||||||
LOG.info("Recieved abort event.");
|
LOG.info("Received abort event.");
|
||||||
watchForAbortedProcedures();
|
watchForAbortedProcedures();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -134,7 +134,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
* Pass along the procedure global barrier notification to any listeners
|
* Pass along the procedure global barrier notification to any listeners
|
||||||
* @param path full znode path that cause the notification
|
* @param path full znode path that cause the notification
|
||||||
*/
|
*/
|
||||||
private void recievedReachedGlobalBarrier(String path) {
|
private void receivedReachedGlobalBarrier(String path) {
|
||||||
LOG.debug("Recieved reached global barrier:" + path);
|
LOG.debug("Recieved reached global barrier:" + path);
|
||||||
String procName = ZKUtil.getNodeName(path);
|
String procName = ZKUtil.getNodeName(path);
|
||||||
this.member.receivedReachedGlobalBarrier(procName);
|
this.member.receivedReachedGlobalBarrier(procName);
|
||||||
|
@ -244,7 +244,7 @@ public class ZKProcedureMemberRpcs implements ProcedureMemberRpcs {
|
||||||
String reachedBarrier = zkController.getReachedBarrierNode(procName);
|
String reachedBarrier = zkController.getReachedBarrierNode(procName);
|
||||||
LOG.debug("Watch for global barrier reached:" + reachedBarrier);
|
LOG.debug("Watch for global barrier reached:" + reachedBarrier);
|
||||||
if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), reachedBarrier)) {
|
if (ZKUtil.watchAndCheckExists(zkController.getWatcher(), reachedBarrier)) {
|
||||||
recievedReachedGlobalBarrier(reachedBarrier);
|
receivedReachedGlobalBarrier(reachedBarrier);
|
||||||
}
|
}
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
member.controllerConnectionFailure("Failed to acquire barrier for procedure: "
|
member.controllerConnectionFailure("Failed to acquire barrier for procedure: "
|
||||||
|
|
Loading…
Reference in New Issue