HBASE-11537 Avoid synchronization on instances of ConcurrentMap (Mike Drob)

This commit is contained in:
Jonathan M Hsieh 2014-07-18 15:40:10 -07:00
parent 209dd6dcfe
commit 5f4e85d3f9
2 changed files with 46 additions and 55 deletions

View File

@ -24,7 +24,6 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -145,7 +144,6 @@ public class ProcedureCoordinator {
String procName = proc.getName(); String procName = proc.getName();
// make sure we aren't already running a procedure of that name // make sure we aren't already running a procedure of that name
synchronized (procedures) {
Procedure oldProc = procedures.get(procName); Procedure oldProc = procedures.get(procName);
if (oldProc != null) { if (oldProc != null) {
// procedures are always eventually completed on both successful and failed execution // procedures are always eventually completed on both successful and failed execution
@ -153,40 +151,41 @@ public class ProcedureCoordinator {
if (!oldProc.isCompleted()) { if (!oldProc.isCompleted()) {
LOG.warn("Procedure " + procName + " currently running. Rejecting new request"); LOG.warn("Procedure " + procName + " currently running. Rejecting new request");
return false; return false;
} } else {
else {
LOG.debug("Procedure " + procName LOG.debug("Procedure " + procName
+ " was in running list but was completed. Accepting new attempt."); + " was in running list but was completed. Accepting new attempt.");
procedures.remove(procName); if (!procedures.remove(procName, oldProc)) {
LOG.warn("Procedure " + procName
+ " has been resubmitted by another thread. Rejecting this request.");
return false;
}
} }
} catch (ForeignException e) { } catch (ForeignException e) {
LOG.debug("Procedure " + procName LOG.debug("Procedure " + procName
+ " was in running list but has exception. Accepting new attempt."); + " was in running list but has exception. Accepting new attempt.");
procedures.remove(procName); if (!procedures.remove(procName, oldProc)) {
LOG.warn("Procedure " + procName
+ " has been resubmitted by another thread. Rejecting this request.");
return false;
} }
} }
} }
// kick off the procedure's execution in a separate thread // kick off the procedure's execution in a separate thread
Future<Void> f = null;
try { try {
synchronized (procedures) { if (this.procedures.putIfAbsent(procName, proc) == null) {
this.procedures.put(procName, proc); this.pool.submit(proc);
f = this.pool.submit(proc);
}
return true; return true;
} else {
LOG.error("Another thread has submitted procedure '" + procName + "'. Ignoring this attempt.");
return false;
}
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
LOG.warn("Procedure " + procName + " rejected by execution pool. Propagating error and " + LOG.warn("Procedure " + procName + " rejected by execution pool. Propagating error.", e);
"cancelling operation.", e);
// Remove the procedure from the list since is not started // Remove the procedure from the list since is not started
this.procedures.remove(procName); this.procedures.remove(procName, proc);
// the thread pool is full and we can't run the procedure // the thread pool is full and we can't run the procedure
proc.receive(new ForeignException(procName, e)); proc.receive(new ForeignException(procName, e));
// cancel procedure proactively
if (f != null) {
f.cancel(true);
}
} }
return false; return false;
} }
@ -217,14 +216,12 @@ public class ProcedureCoordinator {
*/ */
public void abortProcedure(String procName, ForeignException reason) { public void abortProcedure(String procName, ForeignException reason) {
// if we know about the Procedure, notify it // if we know about the Procedure, notify it
synchronized(procedures) {
Procedure proc = procedures.get(procName); Procedure proc = procedures.get(procName);
if (proc == null) { if (proc == null) {
return; return;
} }
proc.receive(reason); proc.receive(reason);
} }
}
/** /**
* Exposed for hooking with unit tests. * Exposed for hooking with unit tests.

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue; import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -139,41 +138,36 @@ public class ProcedureMember implements Closeable {
} }
// make sure we aren't already running an subprocedure of that name // make sure we aren't already running an subprocedure of that name
Subprocedure rsub; Subprocedure rsub = subprocs.get(procName);
synchronized (subprocs) {
rsub = subprocs.get(procName);
}
if (rsub != null) { if (rsub != null) {
if (!rsub.isComplete()) { if (!rsub.isComplete()) {
LOG.error("Subproc '" + procName + "' is already running. Bailing out"); LOG.error("Subproc '" + procName + "' is already running. Bailing out");
return false; return false;
} }
LOG.warn("A completed old subproc " + procName + " is still present, removing"); LOG.warn("A completed old subproc " + procName + " is still present, removing");
subprocs.remove(procName); if (!subprocs.remove(procName, rsub)) {
LOG.error("Another thread has replaced existing subproc '" + procName + "'. Bailing out");
return false;
}
} }
LOG.debug("Submitting new Subprocedure:" + procName); LOG.debug("Submitting new Subprocedure:" + procName);
// kick off the subprocedure // kick off the subprocedure
Future<Void> future = null;
try { try {
synchronized (subprocs) { if (subprocs.putIfAbsent(procName, subproc) == null) {
subprocs.put(procName, subproc); this.pool.submit(subproc);
}
future = this.pool.submit(subproc);
return true; return true;
} catch (RejectedExecutionException e) { } else {
synchronized (subprocs) { LOG.error("Another thread has submitted subproc '" + procName + "'. Bailing out");
subprocs.remove(procName); return false;
} }
} catch (RejectedExecutionException e) {
subprocs.remove(procName, subproc);
// the thread pool is full and we can't run the subprocedure // the thread pool is full and we can't run the subprocedure
String msg = "Subprocedure pool is full!"; String msg = "Subprocedure pool is full!";
subproc.cancel(msg, e.getCause()); subproc.cancel(msg, e.getCause());
// cancel all subprocedures proactively
if (future != null) {
future.cancel(true);
}
} }
LOG.error("Failed to start subprocedure '" + procName + "'"); LOG.error("Failed to start subprocedure '" + procName + "'");
@ -182,7 +176,7 @@ public class ProcedureMember implements Closeable {
/** /**
* Notification that procedure coordinator has reached the global barrier * Notification that procedure coordinator has reached the global barrier
* @param procName name of the subprocedure that should start running the the in-barrier phase * @param procName name of the subprocedure that should start running the in-barrier phase
*/ */
public void receivedReachedGlobalBarrier(String procName) { public void receivedReachedGlobalBarrier(String procName) {
Subprocedure subproc = subprocs.get(procName); Subprocedure subproc = subprocs.get(procName);