HBASE-20939 There will be race when we call suspendIfNotReady and then throw ProcedureSuspendedException
This commit is contained in:
parent
80b40a3b58
commit
7178a98258
|
@ -1,5 +1,4 @@
|
||||||
/*
|
/**
|
||||||
*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -43,7 +42,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
||||||
public class IdLock {
|
public class IdLock {
|
||||||
|
|
||||||
/** An entry returned to the client as a lock object */
|
/** An entry returned to the client as a lock object */
|
||||||
public static class Entry {
|
public static final class Entry {
|
||||||
private final long id;
|
private final long id;
|
||||||
private int numWaiters;
|
private int numWaiters;
|
||||||
private boolean locked = true;
|
private boolean locked = true;
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator
|
||||||
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hbase.util.IdLock;
|
||||||
import org.apache.hadoop.hbase.util.NonceKey;
|
import org.apache.hadoop.hbase.util.NonceKey;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -313,6 +314,14 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
|
|
||||||
private final boolean checkOwnerSet;
|
private final boolean checkOwnerSet;
|
||||||
|
|
||||||
|
// To prevent concurrent execution of the same procedure.
|
||||||
|
// For some rare cases, especially if the procedure uses ProcedureEvent, it is possible that the
|
||||||
|
// procedure is woken up before we finish the suspend which causes the same procedures to be
|
||||||
|
// executed in parallel. This does lead to some problems, see HBASE-20939&HBASE-20949, and is also
|
||||||
|
// a bit confusing to the developers. So here we introduce this lock to prevent the concurrent
|
||||||
|
// execution of the same procedure.
|
||||||
|
private final IdLock procExecutionLock = new IdLock();
|
||||||
|
|
||||||
public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
|
public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
|
||||||
final ProcedureStore store) {
|
final ProcedureStore store) {
|
||||||
this(conf, environment, store, new SimpleProcedureScheduler());
|
this(conf, environment, store, new SimpleProcedureScheduler());
|
||||||
|
@ -1496,14 +1505,7 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
// Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
|
// Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException.
|
||||||
// The exception is caught below and then we hurry to the exit without disturbing state. The
|
// The exception is caught below and then we hurry to the exit without disturbing state. The
|
||||||
// idea is that the processing of this procedure will be unsuspended later by an external event
|
// idea is that the processing of this procedure will be unsuspended later by an external event
|
||||||
// such the report of a region open. TODO: Currently, its possible for two worker threads
|
// such the report of a region open.
|
||||||
// to be working on the same procedure concurrently (locking in procedures is NOT about
|
|
||||||
// concurrency but about tying an entity to a procedure; i.e. a region to a particular
|
|
||||||
// procedure instance). This can make for issues if both threads are changing state.
|
|
||||||
// See env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent());
|
|
||||||
// in RegionTransitionProcedure#reportTransition for example of Procedure putting
|
|
||||||
// itself back on the scheduler making it possible for two threads running against
|
|
||||||
// the one Procedure. Might be ok if they are both doing different, idempotent sections.
|
|
||||||
boolean suspended = false;
|
boolean suspended = false;
|
||||||
|
|
||||||
// Whether to 're-' -execute; run through the loop again.
|
// Whether to 're-' -execute; run through the loop again.
|
||||||
|
@ -1798,12 +1800,14 @@ public class ProcedureExecutor<TEnvironment> {
|
||||||
LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),
|
LOG.trace("Execute pid={} runningCount={}, activeCount={}", proc.getProcId(),
|
||||||
runningCount, activeCount);
|
runningCount, activeCount);
|
||||||
executionStartTime.set(EnvironmentEdgeManager.currentTime());
|
executionStartTime.set(EnvironmentEdgeManager.currentTime());
|
||||||
|
IdLock.Entry lockEntry = procExecutionLock.getLockEntry(proc.getProcId());
|
||||||
try {
|
try {
|
||||||
executeProcedure(proc);
|
executeProcedure(proc);
|
||||||
} catch (AssertionError e) {
|
} catch (AssertionError e) {
|
||||||
LOG.info("ASSERT pid=" + proc.getProcId(), e);
|
LOG.info("ASSERT pid=" + proc.getProcId(), e);
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
|
procExecutionLock.releaseLockEntry(lockEntry);
|
||||||
activeCount = activeExecutorCount.decrementAndGet();
|
activeCount = activeExecutorCount.decrementAndGet();
|
||||||
runningCount = store.setRunningProcedureCount(activeCount);
|
runningCount = store.setRunningProcedureCount(activeCount);
|
||||||
LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),
|
LOG.trace("Halt pid={} runningCount={}, activeCount={}", proc.getProcId(),
|
||||||
|
|
|
@ -15,15 +15,15 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.procedure2;
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Stable
|
|
||||||
public class ProcedureSuspendedException extends ProcedureException {
|
public class ProcedureSuspendedException extends ProcedureException {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = -8328419627678496269L;
|
||||||
|
|
||||||
/** default constructor */
|
/** default constructor */
|
||||||
public ProcedureSuspendedException() {
|
public ProcedureSuspendedException() {
|
||||||
super();
|
super();
|
||||||
|
|
Loading…
Reference in New Issue