HBASE-20939 There will be race when we call suspendIfNotReady and then throw ProcedureSuspendedException

This commit is contained in:
zhangduo 2018-07-27 17:26:40 +08:00
parent 35c598db93
commit 8bfdb19e85
3 changed files with 17 additions and 14 deletions

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -43,7 +42,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
public class IdLock {
/** An entry returned to the client as a lock object */
public static class Entry {
public static final class Entry {
private final long id;
private int numWaiters;
private boolean locked = true;

View File

@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.IdLock;
import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
@ -313,6 +314,14 @@ public class ProcedureExecutor<TEnvironment> {
private final boolean checkOwnerSet;
// To prevent concurrent execution of the same procedure.
// For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the
// procedure is woken up before we finish the suspend which causes the same procedures to be
// executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also
// a bit confusing to the developers. So here we introduce this lock to prevent the concurrent
// execution of the same procedure.
private final IdLock procExecutionLock = new IdLock();
public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
final ProcedureStore store) {
this(conf, environment, store, new SimpleProcedureScheduler());
@ -1496,14 +1505,7 @@ public class ProcedureExecutor<TEnvironment> {
// Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
// The exception is caught below and then we hurry to the exit without disturbing state. The
// idea is that the processing of this procedure will be unsuspended later by an external event
// such the report of a region open. TODO: Currently, its possible for two worker threads
// to be working on the same procedure concurrently (locking in procedures is NOT about
// concurrency but about tying an entity to a procedure; i.e. a region to a particular
// procedure instance). This can make for issues if both threads are changing state.
// See env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
// in RegionTransitionProcedure#reportTransition for example of Procedure putting
// itself back on the scheduler making it possible for two threads running against
// the one Procedure. Might be ok if they are both doing different, idempotent sections.
// such the report of a region open.
boolean suspended = false;
// Whether to 're-' -execute; run through the loop again.
@ -1798,12 +1800,14 @@ public class ProcedureExecutor<TEnvironment> {
LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),
runningCount, activeCount);
executionStartTime.set(EnvironmentEdgeManager.currentTime());
IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
try {
executeProcedure(proc);
} catch (AssertionError e) {
LOG.info("ASSERT pid=" + proc.getProcId(), e);
throw e;
} finally {
procExecutionLock.releaseLockEntry(lockEntry);
activeCount = activeExecutorCount.decrementAndGet();
runningCount = store.setRunningProcedureCount(activeCount);
LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),

View File

@ -15,15 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure2;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@InterfaceAudience.Private
@InterfaceStability.Stable
public class ProcedureSuspendedException extends ProcedureException {
private static final long serialVersionUID = -8328419627678496269L;
/** default constructor */
public ProcedureSuspendedException() {
super();