HBASE-17067 Procedure v2 - remove zklock/tryLock and use wait/wake (Matteo Bertozzi)

This is an amalgam of https://reviews.apache.org/r/54435/ and
9c14863594

Removes notion of suspend/resume from procedure. Instead have the below lock states
and just unschedule if lock is not yet available

 LOCK_ACQUIRED should be returned when the proc has the lock and the proc is ready to execute.
 LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework
   should take care of readding the procedure back to the runnable set for retry
 LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will take care of
  readding the procedure back to the runnable set when the lock is available.

Side benefit is being able to undo a bunch of synchronization around
procedure management.

Signed-off-by: Michael Stack <stack@apache.org>
This commit is contained in:
Michael Stack 2017-01-19 14:11:53 -08:00
parent ba4a926b62
commit 980c8c2047
23 changed files with 430 additions and 426 deletions

View File

@ -289,12 +289,10 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
}
protected void suspendProcedure(final ProcedureEventQueue event, final Procedure procedure) {
procedure.suspend();
event.suspendProcedure(procedure);
}
protected void wakeProcedure(final Procedure procedure) {
procedure.resume();
push(procedure, /* addFront= */ true, /* notify= */false);
}

View File

@ -59,7 +59,13 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceStability.Evolving
public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
public static final long NO_PROC_ID = -1;
public static final int NO_TIMEOUT = -1;
protected static final int NO_TIMEOUT = -1;
public enum LockState {
LOCK_ACQUIRED, // lock acquired and ready to execute
LOCK_YIELD_WAIT, // lock not acquired, framework needs to yield
LOCK_EVENT_WAIT, // lock not acquired, an event will yield the procedure
}
// unchanged after initialization
private NonceKey nonceKey = null;
@ -80,9 +86,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
private volatile byte[] result = null;
// TODO: it will be nice having pointers to allow the scheduler doing suspend/resume tricks
private boolean suspended = false;
/**
* The main code of the procedure. It must be idempotent since execute()
* may be called multiple time in case of machine failure in the middle
@ -142,14 +145,23 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
/**
* The user should override this method, and try to take a lock if necessary.
* A lock can be anything, and it is up to the implementor.
* Example: in our Master we can execute request in parallel for different tables
* create t1 and create t2 can be executed at the same time.
* anything else on t1/t2 is queued waiting that specific table create to happen.
*
* @return true if the lock was acquired and false otherwise
* <p>Example: in our Master we can execute request in parallel for different tables.
* We can create t1 and create t2 and this can be executed at the same time.
* Anything else on t1/t2 is queued waiting that specific table create to happen.
*
* <p>There are 3 LockState:
* <ul><li>LOCK_ACQUIRED should be returned when the proc has the lock and the proc is
* ready to execute.</li>
* <li>LOCK_YIELD_WAIT should be returned when the proc has not the lock and the framework
* should take care of readding the procedure back to the runnable set for retry</li>
* <li>LOCK_EVENT_WAIT should be returned when the proc has not the lock and someone will
* take care of readding the procedure back to the runnable set when the lock is available.
* </li></ul>
* @return the lock state as described above.
*/
protected boolean acquireLock(final TEnvironment env) {
return true;
protected LockState acquireLock(final TEnvironment env) {
return LockState.LOCK_ACQUIRED;
}
/**
@ -301,9 +313,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
*/
protected void toStringState(StringBuilder builder) {
builder.append(getState());
if (isSuspended()) {
builder.append("|SUSPENDED");
}
}
/**
@ -494,23 +503,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
// just because the procedure can get scheduled on different executor threads on each step.
// ==============================================================================================
/**
* @return true if the procedure is in a suspended state,
* waiting for the resources required to execute the procedure will become available.
*/
public synchronized boolean isSuspended() {
return suspended;
}
public synchronized void suspend() {
suspended = true;
}
public synchronized void resume() {
assert isSuspended() : this + " expected suspended state, got " + state;
suspended = false;
}
/**
* @return true if the procedure is in a RUNNABLE state.
*/
@ -737,7 +729,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
* Internal method called by the ProcedureExecutor that starts the user-level code acquireLock().
*/
@InterfaceAudience.Private
protected boolean doAcquireLock(final TEnvironment env) {
protected LockState doAcquireLock(final TEnvironment env) {
return acquireLock(env);
}

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.ProcedureInfo;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.procedure2.Procedure.LockState;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
@ -255,6 +256,7 @@ public class ProcedureExecutor<TEnvironment> {
private final AtomicBoolean running = new AtomicBoolean(false);
private final TEnvironment environment;
private final ProcedureStore store;
private final boolean checkOwnerSet;
public ProcedureExecutor(final Configuration conf, final TEnvironment environment,
@ -1090,17 +1092,34 @@ public class ProcedureExecutor<TEnvironment> {
if (!procStack.acquire(proc)) {
if (procStack.setRollback()) {
// we have the 'rollback-lock' we can start rollingback
if (!executeRollback(rootProcId, procStack)) {
procStack.unsetRollback();
scheduler.yield(proc);
switch (executeRollback(rootProcId, procStack)) {
case LOCK_ACQUIRED:
break;
case LOCK_YIELD_WAIT:
scheduler.yield(proc);
procStack.unsetRollback();
break;
case LOCK_EVENT_WAIT:
procStack.unsetRollback();
break;
default:
throw new UnsupportedOperationException();
}
} else {
// if we can't rollback means that some child is still running.
// the rollback will be executed after all the children are done.
// If the procedure was never executed, remove and mark it as rolledback.
if (!proc.wasExecuted()) {
if (!executeRollback(proc)) {
scheduler.yield(proc);
switch (executeRollback(proc)) {
case LOCK_ACQUIRED:
break;
case LOCK_YIELD_WAIT:
scheduler.yield(proc);
break;
case LOCK_EVENT_WAIT:
break;
default:
throw new UnsupportedOperationException();
}
}
}
@ -1109,11 +1128,19 @@ public class ProcedureExecutor<TEnvironment> {
// Execute the procedure
assert proc.getState() == ProcedureState.RUNNABLE : proc;
if (acquireLock(proc)) {
execProcedure(procStack, proc);
releaseLock(proc, false);
} else {
scheduler.yield(proc);
switch (acquireLock(proc)) {
case LOCK_ACQUIRED:
execProcedure(procStack, proc);
releaseLock(proc, false);
break;
case LOCK_YIELD_WAIT:
scheduler.yield(proc);
break;
case LOCK_EVENT_WAIT:
// someone will wake us up when the lock is available
break;
default:
throw new UnsupportedOperationException();
}
procStack.release(proc);
@ -1139,13 +1166,13 @@ public class ProcedureExecutor<TEnvironment> {
} while (procStack.isFailed());
}
private boolean acquireLock(final Procedure proc) {
private LockState acquireLock(final Procedure proc) {
final TEnvironment env = getEnvironment();
// hasLock() is used in conjunction with holdLock().
// This allows us to not rewrite or carry around the hasLock() flag
// for every procedure. the hasLock() have meaning only if holdLock() is true.
if (proc.holdLock(env) && proc.hasLock(env)) {
return true;
return LockState.LOCK_ACQUIRED;
}
return proc.doAcquireLock(env);
}
@ -1164,7 +1191,7 @@ public class ProcedureExecutor<TEnvironment> {
* Once the procedure is rolledback, the root-procedure will be visible as
* finished to user, and the result will be the fatal exception.
*/
private boolean executeRollback(final long rootProcId, final RootProcedureState procStack) {
private LockState executeRollback(final long rootProcId, final RootProcedureState procStack) {
final Procedure rootProc = procedures.get(rootProcId);
RemoteProcedureException exception = rootProc.getException();
if (exception == null) {
@ -1181,13 +1208,15 @@ public class ProcedureExecutor<TEnvironment> {
while (stackTail --> 0) {
final Procedure proc = subprocStack.get(stackTail);
if (!reuseLock && !acquireLock(proc)) {
LockState lockState;
if (!reuseLock && (lockState = acquireLock(proc)) != LockState.LOCK_ACQUIRED) {
// can't take a lock on the procedure, add the root-proc back on the
// queue waiting for the lock availability
return false;
return lockState;
}
boolean abortRollback = !executeRollback(proc);
lockState = executeRollback(proc);
boolean abortRollback = lockState != LockState.LOCK_ACQUIRED;
abortRollback |= !isRunning() || !store.isRunning();
// If the next procedure is the same to this one
@ -1201,14 +1230,14 @@ public class ProcedureExecutor<TEnvironment> {
// allows to kill the executor before something is stored to the wal.
// useful to test the procedure recovery.
if (abortRollback) {
return false;
return lockState;
}
subprocStack.remove(stackTail);
// if the procedure is kind enough to pass the slot to someone else, yield
if (proc.isYieldAfterExecutionStep(getEnvironment())) {
return false;
return LockState.LOCK_YIELD_WAIT;
}
if (proc != rootProc) {
@ -1221,7 +1250,7 @@ public class ProcedureExecutor<TEnvironment> {
" exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()) +
" exception=" + exception.getMessage());
procedureFinished(rootProc);
return true;
return LockState.LOCK_ACQUIRED;
}
/**
@ -1229,17 +1258,17 @@ public class ProcedureExecutor<TEnvironment> {
* It updates the store with the new state (stack index)
* or will remove completly the procedure in case it is a child.
*/
private boolean executeRollback(final Procedure proc) {
private LockState executeRollback(final Procedure proc) {
try {
proc.doRollback(getEnvironment());
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Roll back attempt failed for " + proc, e);
}
return false;
return LockState.LOCK_YIELD_WAIT;
} catch (InterruptedException e) {
handleInterruptedException(proc, e);
return false;
return LockState.LOCK_YIELD_WAIT;
} catch (Throwable e) {
// Catch NullPointerExceptions or similar errors...
LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e);
@ -1250,7 +1279,7 @@ public class ProcedureExecutor<TEnvironment> {
if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
LOG.debug("TESTING: Kill before store update");
stop();
return false;
return LockState.LOCK_YIELD_WAIT;
}
if (proc.removeStackIndex()) {
@ -1270,7 +1299,7 @@ public class ProcedureExecutor<TEnvironment> {
store.update(proc);
}
return true;
return LockState.LOCK_ACQUIRED;
}
/**

View File

@ -27,7 +27,6 @@ import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.procedure2;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.Callable;
import java.util.ArrayList;
import java.util.Set;
@ -67,27 +68,46 @@ public class ProcedureTestingUtility {
});
}
public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor)
throws Exception {
restart(procExecutor, null, true);
public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor) throws Exception {
restart(procExecutor, false, true, null, null);
}
public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
Runnable beforeStartAction, boolean failOnCorrupted) throws Exception {
ProcedureStore procStore = procExecutor.getStore();
int storeThreads = procExecutor.getCorePoolSize();
int execThreads = procExecutor.getCorePoolSize();
// stop
procExecutor.stop();
procExecutor.join();
procStore.stop(false);
// nothing running...
if (beforeStartAction != null) {
beforeStartAction.run();
public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor,
final boolean avoidTestKillDuringRestart, final boolean failOnCorrupted,
final Callable<Void> stopAction, final Callable<Void> startAction)
throws Exception {
final ProcedureStore procStore = procExecutor.getStore();
final int storeThreads = procExecutor.getCorePoolSize();
final int execThreads = procExecutor.getCorePoolSize();
final ProcedureExecutor.Testing testing = procExecutor.testing;
if (avoidTestKillDuringRestart) {
procExecutor.testing = null;
}
// stop
LOG.info("RESTART - Stop");
procExecutor.stop();
procStore.stop(false);
if (stopAction != null) {
stopAction.call();
}
procExecutor.join();
procExecutor.getScheduler().clear();
// nothing running...
// re-start
LOG.info("RESTART - Start");
procStore.start(storeThreads);
procExecutor.start(execThreads, failOnCorrupted);
if (startAction != null) {
startAction.call();
}
if (avoidTestKillDuringRestart) {
procExecutor.testing = testing;
}
}
public static void storeRestart(ProcedureStore procStore, ProcedureStore.ProcedureLoader loader)
@ -309,11 +329,11 @@ public class ProcedureTestingUtility {
public static <TEnv> void testRecoveryAndDoubleExecution(final ProcedureExecutor<TEnv> procExec,
final long procId, final boolean expectFailure, final Runnable customRestart)
throws Exception {
final Procedure proc = procExec.getProcedure(procId);
Procedure proc = procExec.getProcedure(procId);
waitProcedure(procExec, procId);
assertEquals(false, procExec.isRunning());
for (int i = 0; !procExec.isFinished(procId); ++i) {
proc = procExec.getProcedure(procId);
LOG.info("Restart " + i + " exec state: " + proc);
if (customRestart != null) {
customRestart.run();
@ -415,8 +435,8 @@ public class ProcedureTestingUtility {
// Mark acquire/release lock functions public for test uses.
@Override
public boolean acquireLock(Void env) {
return true;
public LockState acquireLock(Void env) {
return LockState.LOCK_ACQUIRED;
}
@Override

View File

@ -212,11 +212,12 @@ public class TestProcedureSuspended {
}
@Override
protected boolean acquireLock(final TestProcEnv env) {
protected LockState acquireLock(final TestProcEnv env) {
if ((hasLock = lock.compareAndSet(false, true))) {
LOG.info("ACQUIRE LOCK " + this + " " + (hasLock));
return LockState.LOCK_ACQUIRED;
}
return hasLock;
return LockState.LOCK_YIELD_WAIT;
}
@Override

View File

@ -1087,7 +1087,7 @@ public class HMaster extends HRegionServer implements MasterServices {
new MasterProcedureEnv.WALStoreLeaseRecovery(this));
procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this));
procedureExecutor = new ProcedureExecutor(conf, procEnv, procedureStore,
procEnv.getProcedureQueue());
procEnv.getProcedureScheduler());
configurationManager.registerObserver(procEnv);
final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,

View File

@ -289,7 +289,7 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
protected LockState acquireLock(final MasterProcedureEnv env) {
boolean ret = lock.acquireLock(env);
locked.set(ret);
hasLock = ret;
@ -298,8 +298,10 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
LOG.debug("LOCKED - " + toString());
}
lastHeartBeat.set(System.currentTimeMillis());
return LockState.LOCK_ACQUIRED;
}
return ret;
LOG.warn("Failed acquire LOCK " + toString() + "; YIELDING");
return LockState.LOCK_EVENT_WAIT;
}
@Override
@ -414,37 +416,43 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
private class TableExclusiveLock implements LockInterface {
@Override
public boolean acquireLock(final MasterProcedureEnv env) {
return env.getProcedureScheduler().tryAcquireTableExclusiveLock(LockProcedure.this, tableName);
// We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT
// to get the lock and false if you don't; i.e. you got the lock.
return !env.getProcedureScheduler().waitTableExclusiveLock(LockProcedure.this, tableName);
}
@Override
public void releaseLock(final MasterProcedureEnv env) {
env.getProcedureScheduler().releaseTableExclusiveLock(LockProcedure.this, tableName);
env.getProcedureScheduler().wakeTableExclusiveLock(LockProcedure.this, tableName);
}
}
private class TableSharedLock implements LockInterface {
@Override
public boolean acquireLock(final MasterProcedureEnv env) {
return env.getProcedureScheduler().tryAcquireTableSharedLock(LockProcedure.this, tableName);
// We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT
// to get the lock and false if you don't; i.e. you got the lock.
return !env.getProcedureScheduler().waitTableSharedLock(LockProcedure.this, tableName);
}
@Override
public void releaseLock(final MasterProcedureEnv env) {
env.getProcedureScheduler().releaseTableSharedLock(LockProcedure.this, tableName);
env.getProcedureScheduler().wakeTableSharedLock(LockProcedure.this, tableName);
}
}
private class NamespaceExclusiveLock implements LockInterface {
@Override
public boolean acquireLock(final MasterProcedureEnv env) {
return env.getProcedureScheduler().tryAcquireNamespaceExclusiveLock(
// We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT
// to get the lock and false if you don't; i.e. you got the lock.
return !env.getProcedureScheduler().waitNamespaceExclusiveLock(
LockProcedure.this, namespace);
}
@Override
public void releaseLock(final MasterProcedureEnv env) {
env.getProcedureScheduler().releaseNamespaceExclusiveLock(
env.getProcedureScheduler().wakeNamespaceExclusiveLock(
LockProcedure.this, namespace);
}
}
@ -452,6 +460,8 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
private class RegionExclusiveLock implements LockInterface {
@Override
public boolean acquireLock(final MasterProcedureEnv env) {
// We invert return from waitNamespaceExclusiveLock; it returns true if you HAVE TO WAIT
// to get the lock and false if you don't; i.e. you got the lock.
return !env.getProcedureScheduler().waitRegions(LockProcedure.this, tableName, regionInfos);
}
@ -460,4 +470,4 @@ public final class LockProcedure extends Procedure<MasterProcedureEnv>
env.getProcedureScheduler().wakeRegions(LockProcedure.this, tableName, regionInfos);
}
}
}
}

View File

@ -58,13 +58,16 @@ public abstract class AbstractStateMachineNamespaceProcedure<TState>
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(this, getNamespaceName());
protected LockState acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT;
if (env.getProcedureScheduler().waitNamespaceExclusiveLock(this, getNamespaceName())) {
return LockState.LOCK_EVENT_WAIT;
}
return LockState.LOCK_ACQUIRED;
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseNamespaceExclusiveLock(this, getNamespaceName());
env.getProcedureScheduler().wakeNamespaceExclusiveLock(this, getNamespaceName());
}
}
}

View File

@ -75,14 +75,17 @@ public abstract class AbstractStateMachineTableProcedure<TState>
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return false;
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, getTableName());
protected LockState acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) return LockState.LOCK_EVENT_WAIT;
if (env.getProcedureScheduler().waitTableExclusiveLock(this, getTableName())) {
return LockState.LOCK_EVENT_WAIT;
}
return LockState.LOCK_ACQUIRED;
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseTableExclusiveLock(this, getTableName());
env.getProcedureScheduler().wakeTableExclusiveLock(this, getTableName());
}
protected User getUser() {
@ -108,4 +111,4 @@ public abstract class AbstractStateMachineTableProcedure<TState>
throw new TableNotFoundException(getTableName());
}
}
}
}

View File

@ -160,16 +160,19 @@ public class CreateNamespaceProcedure
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
protected LockState acquireLock(final MasterProcedureEnv env) {
if (!env.getMasterServices().isInitialized()) {
// Namespace manager might not be ready if master is not fully initialized,
// return false to reject user namespace creation; return true for default
// and system namespace creation (this is part of master initialization).
if (!isBootstrapNamespace() && env.waitInitialized(this)) {
return false;
return LockState.LOCK_EVENT_WAIT;
}
}
return env.getProcedureQueue().tryAcquireNamespaceExclusiveLock(this, getNamespaceName());
if (env.getProcedureScheduler().waitNamespaceExclusiveLock(this, getNamespaceName())) {
return LockState.LOCK_EVENT_WAIT;
}
return LockState.LOCK_ACQUIRED;
}
@Override

View File

@ -216,11 +216,14 @@ public class CreateTableProcedure
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
protected LockState acquireLock(final MasterProcedureEnv env) {
if (!getTableName().isSystemTable() && env.waitInitialized(this)) {
return false;
return LockState.LOCK_EVENT_WAIT;
}
return env.getProcedureQueue().tryAcquireTableExclusiveLock(this, getTableName());
if (env.getProcedureScheduler().waitTableExclusiveLock(this, getTableName())) {
return LockState.LOCK_EVENT_WAIT;
}
return LockState.LOCK_ACQUIRED;
}
private boolean prepareCreate(final MasterProcedureEnv env) throws IOException {

View File

@ -121,11 +121,6 @@ public class MasterProcedureEnv implements ConfigurationObserver {
return master.getMasterCoprocessorHost();
}
@Deprecated
public MasterProcedureScheduler getProcedureQueue() {
return procSched;
}
public MasterProcedureScheduler getProcedureScheduler() {
return procSched;
}

View File

@ -112,8 +112,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
private <T extends Comparable<T>> void doAdd(final FairQueue<T> fairq,
final Queue<T> queue, final Procedure proc, final boolean addFront) {
if (proc.isSuspended()) return;
queue.add(proc, addFront);
if (!queue.hasExclusiveLock() || queue.isLockOwner(proc.getProcId())) {
// if the queue was not remove for an xlock execution
@ -157,6 +155,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
final boolean xlockReq = rq.requireExclusiveLock(pollResult);
if (xlockReq && rq.isLocked() && !rq.hasLockAccess(pollResult)) {
// someone is already holding the lock (e.g. shared lock). avoid a yield
removeFromRunQueue(fairq, rq);
return null;
}
@ -177,7 +176,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
@Override
public void clearQueue() {
protected void clearQueue() {
// Remove Servers
for (int i = 0; i < serverBuckets.length; ++i) {
clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
@ -460,7 +459,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
@Override
public synchronized boolean isAvailable() {
public boolean isAvailable() {
// if there are no items in the queue, or the namespace is locked.
// we can't execute operation on this table
if (isEmpty() || namespaceQueue.hasExclusiveLock()) {
@ -478,7 +477,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
return true;
}
public synchronized RegionEvent getRegionEvent(final HRegionInfo regionInfo) {
public RegionEvent getRegionEvent(final HRegionInfo regionInfo) {
if (regionEventMap == null) {
regionEventMap = new HashMap<String, RegionEvent>();
}
@ -490,7 +489,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
return event;
}
public synchronized void removeRegionEvent(final RegionEvent event) {
public void removeRegionEvent(final RegionEvent event) {
regionEventMap.remove(event.getRegionInfo().getEncodedName());
if (regionEventMap.isEmpty()) {
regionEventMap = null;
@ -511,30 +510,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
public boolean requireExclusiveLock(Procedure proc) {
TableProcedureInterface tpi = (TableProcedureInterface)proc;
switch (tpi.getTableOperationType()) {
case CREATE:
case DELETE:
case DISABLE:
case ENABLE:
return true;
case EDIT:
// we allow concurrent edit on the NS table
return !tpi.getTableName().equals(TableName.NAMESPACE_TABLE_NAME);
case READ:
return false;
// region operations are using the shared-lock on the table
// and then they will grab an xlock on the region.
case SPLIT:
case MERGE:
case ASSIGN:
case UNASSIGN:
case REGION_EDIT:
return false;
default:
break;
}
throw new UnsupportedOperationException("unexpected type " + tpi.getTableOperationType());
return requireTableExclusiveLock((TableProcedureInterface)proc);
}
}
@ -589,96 +565,139 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
// ============================================================================
// Table Locking Helpers
// ============================================================================
private static boolean requireTableExclusiveLock(TableProcedureInterface proc) {
switch (proc.getTableOperationType()) {
case CREATE:
case DELETE:
case DISABLE:
case ENABLE:
return true;
case EDIT:
// we allow concurrent edit on the NS table
return !proc.getTableName().equals(TableName.NAMESPACE_TABLE_NAME);
case READ:
return false;
// region operations are using the shared-lock on the table
// and then they will grab an xlock on the region.
case SPLIT:
case MERGE:
case ASSIGN:
case UNASSIGN:
case REGION_EDIT:
return false;
default:
break;
}
throw new UnsupportedOperationException("unexpected type " +
proc.getTableOperationType());
}
/**
* Try to acquire the exclusive lock on the specified table.
* other operations in the table-queue will be executed after the lock is released.
* Suspend the procedure if the specified table is already locked.
* Other operations in the table-queue will be executed after the lock is released.
* @param procedure the procedure trying to acquire the lock
* @param table Table to lock
* @return true if we were able to acquire the lock on the table, otherwise false.
* @return true if the procedure has to wait for the table to be available
*/
public boolean tryAcquireTableExclusiveLock(final Procedure procedure, final TableName table) {
public boolean waitTableExclusiveLock(final Procedure procedure, final TableName table) {
schedLock();
try {
final TableQueue queue = getTableQueue(table);
if (!queue.getNamespaceQueue().trySharedLock()) {
return false;
final TableQueue tableQueue = getTableQueue(table);
final NamespaceQueue nsQueue = tableQueue.getNamespaceQueue();
if (!nsQueue.trySharedLock()) {
suspendProcedure(nsQueue.getEvent(), procedure);
return true;
}
if (!queue.tryExclusiveLock(procedure)) {
queue.getNamespaceQueue().releaseSharedLock();
return false;
if (!tableQueue.tryExclusiveLock(procedure)) {
nsQueue.releaseSharedLock();
suspendProcedure(tableQueue.getEvent(), procedure);
return true;
}
removeFromRunQueue(tableRunQueue, queue);
return true;
removeFromRunQueue(tableRunQueue, tableQueue);
return false;
} finally {
schedUnlock();
}
}
/**
* Release the exclusive lock taken with tryAcquireTableWrite()
* Wake the procedures waiting for the specified table
* @param procedure the procedure releasing the lock
* @param table the name of the table that has the exclusive lock
*/
public void releaseTableExclusiveLock(final Procedure procedure, final TableName table) {
public void wakeTableExclusiveLock(final Procedure procedure, final TableName table) {
schedLock();
try {
final TableQueue queue = getTableQueue(table);
if (!queue.hasParentLock(procedure)) {
queue.releaseExclusiveLock(procedure);
final TableQueue tableQueue = getTableQueue(table);
int waitingCount = 0;
if (!tableQueue.hasParentLock(procedure)) {
tableQueue.releaseExclusiveLock(procedure);
waitingCount += popEventWaitingProcedures(tableQueue.getEvent());
}
queue.getNamespaceQueue().releaseSharedLock();
addToRunQueue(tableRunQueue, queue);
final NamespaceQueue nsQueue = tableQueue.getNamespaceQueue();
if (nsQueue.releaseSharedLock()) {
waitingCount += popEventWaitingProcedures(nsQueue.getEvent());
}
addToRunQueue(tableRunQueue, tableQueue);
wakePollIfNeeded(waitingCount);
} finally {
schedUnlock();
}
}
/**
* Try to acquire the shared lock on the specified table.
* Suspend the procedure if the specified table is already locked.
* other "read" operations in the table-queue may be executed concurrently,
* @param procedure the procedure trying to acquire the lock
* @param table Table to lock
* @return true if we were able to acquire the lock on the table, otherwise false.
* @return true if the procedure has to wait for the table to be available
*/
public boolean tryAcquireTableSharedLock(final Procedure procedure, final TableName table) {
return tryAcquireTableQueueSharedLock(procedure, table) != null;
public boolean waitTableSharedLock(final Procedure procedure, final TableName table) {
return waitTableQueueSharedLock(procedure, table) == null;
}
private TableQueue tryAcquireTableQueueSharedLock(final Procedure procedure,
final TableName table) {
private TableQueue waitTableQueueSharedLock(final Procedure procedure, final TableName table) {
schedLock();
try {
final TableQueue queue = getTableQueue(table);
if (!queue.getNamespaceQueue().trySharedLock()) {
final TableQueue tableQueue = getTableQueue(table);
final NamespaceQueue nsQueue = tableQueue.getNamespaceQueue();
if (!nsQueue.trySharedLock()) {
suspendProcedure(nsQueue.getEvent(), procedure);
return null;
}
if (!queue.trySharedLock()) {
queue.getNamespaceQueue().releaseSharedLock();
if (!tableQueue.trySharedLock()) {
tableQueue.getNamespaceQueue().releaseSharedLock();
suspendProcedure(tableQueue.getEvent(), procedure);
return null;
}
return queue;
return tableQueue;
} finally {
schedUnlock();
}
}
/**
* Release the shared lock taken with tryAcquireTableRead()
* Wake the procedures waiting for the specified table
* @param procedure the procedure releasing the lock
* @param table the name of the table that has the shared lock
*/
public void releaseTableSharedLock(final Procedure procedure, final TableName table) {
public void wakeTableSharedLock(final Procedure procedure, final TableName table) {
schedLock();
try {
final TableQueue queue = getTableQueue(table);
if (queue.releaseSharedLock()) {
addToRunQueue(tableRunQueue, queue);
final TableQueue tableQueue = getTableQueue(table);
final NamespaceQueue nsQueue = tableQueue.getNamespaceQueue();
int waitingCount = 0;
if (tableQueue.releaseSharedLock()) {
addToRunQueue(tableRunQueue, tableQueue);
waitingCount += popEventWaitingProcedures(tableQueue.getEvent());
}
queue.getNamespaceQueue().releaseSharedLock();
if (nsQueue.releaseSharedLock()) {
waitingCount += popEventWaitingProcedures(nsQueue.getEvent());
}
wakePollIfNeeded(waitingCount);
} finally {
schedUnlock();
}
@ -746,7 +765,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
queue = getTableQueueWithLock(table);
} else {
// acquire the table shared-lock
queue = tryAcquireTableQueueSharedLock(procedure, table);
queue = waitTableQueueSharedLock(procedure, table);
if (queue == null) return true;
}
@ -771,7 +790,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
if (!hasLock && !procedure.hasParent()) {
releaseTableSharedLock(procedure, table);
wakeTableSharedLock(procedure, table);
}
return !hasLock;
}
@ -822,13 +841,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
for (int i = numProcs - 1; i >= 0; --i) {
wakeProcedure(nextProcs[i]);
}
wakePollIfNeeded(numProcs);
if (!procedure.hasParent()) {
// release the table shared-lock.
// (if we have a parent, it is holding an xlock so we didn't take the shared-lock)
releaseTableSharedLock(procedure, table);
wakeTableSharedLock(procedure, table);
}
} finally {
schedUnlock();
@ -839,45 +856,52 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
// Namespace Locking Helpers
// ============================================================================
/**
* Try to acquire the exclusive lock on the specified namespace.
* @see #releaseNamespaceExclusiveLock(Procedure,String)
* Suspend the procedure if the specified namespace is already locked.
* @see #wakeNamespaceExclusiveLock(Procedure,String)
* @param procedure the procedure trying to acquire the lock
* @param nsName Namespace to lock
* @return true if we were able to acquire the lock on the namespace, otherwise false.
* @return true if the procedure has to wait for the namespace to be available
*/
public boolean tryAcquireNamespaceExclusiveLock(final Procedure procedure, final String nsName) {
public boolean waitNamespaceExclusiveLock(final Procedure procedure, final String nsName) {
schedLock();
try {
TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME);
if (!tableQueue.trySharedLock()) return false;
NamespaceQueue nsQueue = getNamespaceQueue(nsName);
boolean hasLock = nsQueue.tryExclusiveLock(procedure);
if (!hasLock) {
tableQueue.releaseSharedLock();
final TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME);
if (!tableQueue.trySharedLock()) {
suspendProcedure(tableQueue.getEvent(), procedure);
return true;
}
return hasLock;
final NamespaceQueue nsQueue = getNamespaceQueue(nsName);
if (!nsQueue.tryExclusiveLock(procedure)) {
tableQueue.releaseSharedLock();
suspendProcedure(nsQueue.getEvent(), procedure);
return true;
}
return false;
} finally {
schedUnlock();
}
}
/**
* Release the exclusive lock
* @see #tryAcquireNamespaceExclusiveLock(Procedure,String)
* Wake the procedures waiting for the specified namespace
* @see #waitNamespaceExclusiveLock(Procedure,String)
* @param procedure the procedure releasing the lock
* @param nsName the namespace that has the exclusive lock
*/
public void releaseNamespaceExclusiveLock(final Procedure procedure, final String nsName) {
public void wakeNamespaceExclusiveLock(final Procedure procedure, final String nsName) {
schedLock();
try {
final TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME);
final NamespaceQueue queue = getNamespaceQueue(nsName);
queue.releaseExclusiveLock(procedure);
final NamespaceQueue nsQueue = getNamespaceQueue(nsName);
int waitingCount = 0;
nsQueue.releaseExclusiveLock(procedure);
if (tableQueue.releaseSharedLock()) {
addToRunQueue(tableRunQueue, tableQueue);
waitingCount += popEventWaitingProcedures(tableQueue.getEvent());
}
waitingCount += popEventWaitingProcedures(nsQueue.getEvent());
wakePollIfNeeded(waitingCount);
} finally {
schedUnlock();
}
@ -888,67 +912,45 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
// ============================================================================
/**
* Try to acquire the exclusive lock on the specified server.
* @see #releaseServerExclusiveLock(Procedure,ServerName)
* @see #wakeServerExclusiveLock(Procedure,ServerName)
* @param procedure the procedure trying to acquire the lock
* @param serverName Server to lock
* @return true if we were able to acquire the lock on the server, otherwise false.
* @return true if the procedure has to wait for the server to be available
*/
public boolean tryAcquireServerExclusiveLock(final Procedure procedure,
final ServerName serverName) {
public boolean waitServerExclusiveLock(final Procedure procedure, final ServerName serverName) {
schedLock();
try {
ServerQueue queue = getServerQueue(serverName);
if (queue.tryExclusiveLock(procedure)) {
removeFromRunQueue(serverRunQueue, queue);
return true;
return false;
}
suspendProcedure(queue.getEvent(), procedure);
return true;
} finally {
schedUnlock();
}
return false;
}
/**
* Release the exclusive lock
* @see #tryAcquireServerExclusiveLock(Procedure,ServerName)
* Wake the procedures waiting for the specified server
* @see #waitServerExclusiveLock(Procedure,ServerName)
* @param procedure the procedure releasing the lock
* @param serverName the server that has the exclusive lock
*/
public void releaseServerExclusiveLock(final Procedure procedure,
final ServerName serverName) {
public void wakeServerExclusiveLock(final Procedure procedure, final ServerName serverName) {
schedLock();
try {
ServerQueue queue = getServerQueue(serverName);
final ServerQueue queue = getServerQueue(serverName);
queue.releaseExclusiveLock(procedure);
addToRunQueue(serverRunQueue, queue);
int waitingCount = popEventWaitingProcedures(queue.getEvent());
wakePollIfNeeded(waitingCount);
} finally {
schedUnlock();
}
}
/**
* Try to acquire the shared lock on the specified server.
* @see #releaseServerSharedLock(Procedure,ServerName)
* @param procedure the procedure releasing the lock
* @param serverName Server to lock
* @return true if we were able to acquire the lock on the server, otherwise false.
*/
public boolean tryAcquireServerSharedLock(final Procedure procedure,
final ServerName serverName) {
return getServerQueueWithLock(serverName).trySharedLock();
}
/**
* Release the shared lock taken
* @see #tryAcquireServerSharedLock(Procedure,ServerName)
* @param procedure the procedure releasing the lock
* @param serverName the server that has the shared lock
*/
public void releaseServerSharedLock(final Procedure procedure,
final ServerName serverName) {
getServerQueueWithLock(serverName).releaseSharedLock();
}
// ============================================================================
// Generic Helpers
// ============================================================================
@ -965,8 +967,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
boolean isSuspended();
}
// TODO Why OK not having synchronized access and/or volatiles and
// sharedLock-- and sharedLock++? Is this accessed by one thread only?
// Write up the concurrency expectations. St.Ack 01/19/2017
private static abstract class Queue<TKey extends Comparable<TKey>>
extends AvlLinkedNode<Queue<TKey>> implements QueueInterface {
private final ProcedureEventQueue event;
private boolean suspended = false;
private long exclusiveLockProcIdOwner = Long.MIN_VALUE;
@ -982,6 +988,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
public Queue(TKey key, int priority) {
this.key = key;
this.priority = priority;
this.event = new ProcedureEventQueue();
}
protected TKey getKey() {
@ -992,6 +999,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
return priority;
}
public ProcedureEventQueue getEvent() {
return event;
}
/**
* True if the queue is not in the run-queue and it is owned by an event.
*/
@ -1008,48 +1019,48 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
// ======================================================================
// Read/Write Locking helpers
// ======================================================================
public synchronized boolean isLocked() {
public boolean isLocked() {
return hasExclusiveLock() || sharedLock > 0;
}
public synchronized boolean hasExclusiveLock() {
public boolean hasExclusiveLock() {
return this.exclusiveLockProcIdOwner != Long.MIN_VALUE;
}
public synchronized boolean trySharedLock() {
public boolean trySharedLock() {
if (hasExclusiveLock()) return false;
sharedLock++;
return true;
}
public synchronized boolean releaseSharedLock() {
public boolean releaseSharedLock() {
return --sharedLock == 0;
}
protected synchronized boolean isSingleSharedLock() {
protected boolean isSingleSharedLock() {
return sharedLock == 1;
}
public synchronized boolean isLockOwner(long procId) {
public boolean isLockOwner(long procId) {
return exclusiveLockProcIdOwner == procId;
}
public synchronized boolean hasParentLock(final Procedure proc) {
public boolean hasParentLock(final Procedure proc) {
return proc.hasParent() &&
(isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId()));
}
public synchronized boolean hasLockAccess(final Procedure proc) {
public boolean hasLockAccess(final Procedure proc) {
return isLockOwner(proc.getProcId()) || hasParentLock(proc);
}
public synchronized boolean tryExclusiveLock(final Procedure proc) {
public boolean tryExclusiveLock(final Procedure proc) {
if (isLocked()) return hasLockAccess(proc);
exclusiveLockProcIdOwner = proc.getProcId();
return true;
}
public synchronized boolean releaseExclusiveLock(final Procedure proc) {
public boolean releaseExclusiveLock(final Procedure proc) {
if (isLockOwner(proc.getProcId())) {
exclusiveLockProcIdOwner = Long.MIN_VALUE;
return true;
@ -1059,7 +1070,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
// This should go away when we have the new AM and its events
// and we move xlock to the lock-event-queue.
public synchronized boolean isAvailable() {
public boolean isAvailable() {
return !hasExclusiveLock() && !isEmpty();
}

View File

@ -322,17 +322,19 @@ public class MergeTableRegionsProcedure
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
protected LockState acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) {
return false;
return LockState.LOCK_EVENT_WAIT;
}
return !env.getProcedureQueue().waitRegions(
this, getTableName(), regionsToMerge[0], regionsToMerge[1]);
return env.getProcedureScheduler().waitRegions(this, getTableName(),
regionsToMerge[0], regionsToMerge[1])?
LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED;
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().wakeRegions(this, getTableName(), regionsToMerge[0], regionsToMerge[1]);
env.getProcedureScheduler().wakeRegions(this, getTableName(),
regionsToMerge[0], regionsToMerge[1]);
}
@Override

View File

@ -562,14 +562,19 @@ implements ServerProcedureInterface {
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
if (env.waitServerCrashProcessingEnabled(this)) return false;
return env.getProcedureQueue().tryAcquireServerExclusiveLock(this, getServerName());
protected LockState acquireLock(final MasterProcedureEnv env) {
// TODO: Put this BACK AFTER AMv2 goes in!!!!
// if (env.waitFailoverCleanup(this)) return LockState.LOCK_EVENT_WAIT;
if (env.waitServerCrashProcessingEnabled(this)) return LockState.LOCK_EVENT_WAIT;
if (env.getProcedureScheduler().waitServerExclusiveLock(this, getServerName())) {
return LockState.LOCK_EVENT_WAIT;
}
return LockState.LOCK_ACQUIRED;
}
@Override
protected void releaseLock(final MasterProcedureEnv env) {
env.getProcedureQueue().releaseServerExclusiveLock(this, getServerName());
env.getProcedureScheduler().wakeServerExclusiveLock(this, getServerName());
}
@Override

View File

@ -347,11 +347,12 @@ public class SplitTableRegionProcedure
}
@Override
protected boolean acquireLock(final MasterProcedureEnv env) {
protected LockState acquireLock(final MasterProcedureEnv env) {
if (env.waitInitialized(this)) {
return false;
return LockState.LOCK_EVENT_WAIT;
}
return !env.getProcedureScheduler().waitRegions(this, getTableName(), parentHRI);
return env.getProcedureScheduler().waitRegions(this, getTableName(), parentHRI)?
LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED;
}
@Override

View File

@ -59,7 +59,6 @@ public class TestLockManager {
private static final Log LOG = LogFactory.getLog(TestLockProcedure.class);
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final Configuration conf = UTIL.getConfiguration();
private static MasterServices masterServices;
private static String namespace = "namespace";

View File

@ -62,11 +62,11 @@ public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBase
+ "proportion of table:region ops is 1:regions_per_table. Default: "
+ DEFAULT_OPS_TYPE);
private int numTables;
private int regionsPerTable;
private int numOps;
private int numThreads;
private String opsType;
private int numTables = DEFAULT_NUM_TABLES;
private int regionsPerTable = DEFAULT_REGIONS_PER_TABLE;
private int numOps = DEFAULT_NUM_OPERATIONS;
private int numThreads = DEFAULT_NUM_THREADS;
private String opsType = DEFAULT_OPS_TYPE;
private MasterProcedureScheduler procedureScheduler;
// List of table/region procedures to schedule.
@ -83,10 +83,13 @@ public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBase
super(procId, hri.getTable(), TableOperationType.UNASSIGN, hri);
}
public boolean acquireLock(Void env) {
return !procedureScheduler.waitRegions(this, getTableName(), getRegionInfo());
@Override
public LockState acquireLock(Void env) {
return procedureScheduler.waitRegions(this, getTableName(), getRegionInfo())?
LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED;
}
@Override
public void releaseLock(Void env) {
procedureScheduler.wakeRegions(this, getTableName(), getRegionInfo());
}
@ -110,12 +113,15 @@ public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBase
super(procId, tableName, TableOperationType.EDIT);
}
public boolean acquireLock(Void env) {
return procedureScheduler.tryAcquireTableExclusiveLock(this, getTableName());
@Override
public LockState acquireLock(Void env) {
return procedureScheduler.waitTableExclusiveLock(this, getTableName())?
LockState.LOCK_EVENT_WAIT: LockState.LOCK_ACQUIRED;
}
@Override
public void releaseLock(Void env) {
procedureScheduler.releaseTableExclusiveLock(this, getTableName());
procedureScheduler.wakeTableExclusiveLock(this, getTableName());
}
}
@ -212,11 +218,15 @@ public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBase
continue;
}
if (proc.acquireLock(null)) {
completed.incrementAndGet();
proc.releaseLock(null);
} else {
procedureScheduler.yield(proc);
switch (proc.acquireLock(null)) {
case LOCK_ACQUIRED:
completed.incrementAndGet();
proc.releaseLock(null);
break;
case LOCK_YIELD_WAIT:
break;
case LOCK_EVENT_WAIT:
break;
}
if (completed.get() % 100000 == 0) {
System.out.println("Completed " + completed.get() + " procedures.");

View File

@ -18,10 +18,14 @@
package org.apache.hadoop.hbase.master.procedure;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -45,17 +49,12 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.TableStateManager;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.MD5Hash;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
public class MasterProcedureTestingUtility {
private static final Log LOG = LogFactory.getLog(MasterProcedureTestingUtility.class);

View File

@ -18,14 +18,15 @@
package org.apache.hadoop.hbase.master.procedure;
import static org.junit.Assert.assertEquals;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
@ -37,16 +38,12 @@ import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@Category({MasterTests.class, MediumTests.class})
public class TestMasterProcedureEvents {
private static final Log LOG = LogFactory.getLog(TestCreateTableProcedure.class);
@ -141,7 +138,7 @@ public class TestMasterProcedureEvents {
private void testProcedureEventWaitWake(final HMaster master, final ProcedureEvent event,
final Procedure proc) throws Exception {
final ProcedureExecutor<MasterProcedureEnv> procExec = master.getMasterProcedureExecutor();
final MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureQueue();
final MasterProcedureScheduler procSched = procExec.getEnvironment().getProcedureScheduler();
final long startPollCalls = procSched.getPollCalls();
final long startNullPollCalls = procSched.getNullPollCalls();

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.hbase.master.procedure;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
@ -25,7 +29,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.procedure2.Procedure;
@ -39,10 +42,6 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@Category({MasterTests.class, SmallTests.class})
public class TestMasterProcedureScheduler {
private static final Log LOG = LogFactory.getLog(TestMasterProcedureScheduler.class);
@ -89,8 +88,8 @@ public class TestMasterProcedureScheduler {
Procedure proc = queue.poll();
assertTrue(proc != null);
TableName tableName = ((TestTableProcedure)proc).getTableName();
queue.tryAcquireTableExclusiveLock(proc, tableName);
queue.releaseTableExclusiveLock(proc, tableName);
queue.waitTableExclusiveLock(proc, tableName);
queue.wakeTableExclusiveLock(proc, tableName);
queue.completionCleanup(proc);
assertEquals(--count, queue.size());
assertEquals(i * 1000 + j, proc.getProcId());
@ -128,12 +127,12 @@ public class TestMasterProcedureScheduler {
Procedure proc = queue.poll();
assertEquals(1, proc.getProcId());
// take the xlock
assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
// table can't be deleted because we have the lock
assertEquals(0, queue.size());
assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
// release the xlock
queue.releaseTableExclusiveLock(proc, tableName);
queue.wakeTableExclusiveLock(proc, tableName);
// complete the table deletion
assertTrue(queue.markTableAsDeleted(tableName, proc));
}
@ -164,7 +163,7 @@ public class TestMasterProcedureScheduler {
Procedure proc = procs[i] = queue.poll();
assertEquals(i + 1, proc.getProcId());
// take the rlock
assertTrue(queue.tryAcquireTableSharedLock(proc, tableName));
assertEquals(false, queue.waitTableSharedLock(proc, tableName));
// table can't be deleted because we have locks and/or items in the queue
assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
}
@ -173,7 +172,7 @@ public class TestMasterProcedureScheduler {
// table can't be deleted because we have locks
assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
// release the rlock
queue.releaseTableSharedLock(procs[i], tableName);
queue.wakeTableSharedLock(procs[i], tableName);
}
// there are no items and no lock in the queeu
@ -202,48 +201,48 @@ public class TestMasterProcedureScheduler {
// Fetch the 1st item and take the write lock
Procedure proc = queue.poll();
assertEquals(1, proc.getProcId());
assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName));
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
// Fetch the 2nd item and verify that the lock can't be acquired
assertEquals(null, queue.poll(0));
// Release the write lock and acquire the read lock
queue.releaseTableExclusiveLock(proc, tableName);
queue.wakeTableExclusiveLock(proc, tableName);
// Fetch the 2nd item and take the read lock
Procedure rdProc = queue.poll();
assertEquals(2, rdProc.getProcId());
assertEquals(true, queue.tryAcquireTableSharedLock(rdProc, tableName));
assertEquals(false, queue.waitTableSharedLock(rdProc, tableName));
// Fetch the 3rd item and verify that the lock can't be acquired
assertEquals(null, queue.poll(0));
// release the rdlock of item 2 and take the wrlock for the 3d item
queue.releaseTableSharedLock(rdProc, tableName);
queue.wakeTableSharedLock(rdProc, tableName);
// Fetch the 3rd item and take the write lock
Procedure wrProc = queue.poll();
assertEquals(true, queue.tryAcquireTableExclusiveLock(wrProc, tableName));
assertEquals(false, queue.waitTableExclusiveLock(wrProc, tableName));
// Fetch 4th item and verify that the lock can't be acquired
assertEquals(null, queue.poll(0));
// Release the write lock and acquire the read lock
queue.releaseTableExclusiveLock(wrProc, tableName);
queue.wakeTableExclusiveLock(wrProc, tableName);
// Fetch the 4th item and take the read lock
rdProc = queue.poll();
assertEquals(4, rdProc.getProcId());
assertEquals(true, queue.tryAcquireTableSharedLock(rdProc, tableName));
assertEquals(false, queue.waitTableSharedLock(rdProc, tableName));
// Fetch the 4th item and take the read lock
Procedure rdProc2 = queue.poll();
assertEquals(5, rdProc2.getProcId());
assertEquals(true, queue.tryAcquireTableSharedLock(rdProc2, tableName));
assertEquals(false, queue.waitTableSharedLock(rdProc2, tableName));
// Release 4th and 5th read-lock
queue.releaseTableSharedLock(rdProc, tableName);
queue.releaseTableSharedLock(rdProc2, tableName);
queue.wakeTableSharedLock(rdProc, tableName);
queue.wakeTableSharedLock(rdProc2, tableName);
// remove table queue
assertEquals(0, queue.size());
@ -268,34 +267,36 @@ public class TestMasterProcedureScheduler {
// Fetch the 1st item and take the write lock
Procedure procNs1 = queue.poll();
assertEquals(1, procNs1.getProcId());
assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(procNs1, nsName1));
assertEquals(false, queue.waitNamespaceExclusiveLock(procNs1, nsName1));
// System tables have 2 as default priority
Procedure procNs2 = queue.poll();
assertEquals(4, procNs2.getProcId());
assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(procNs2, nsName2));
queue.releaseNamespaceExclusiveLock(procNs2, nsName2);
assertEquals(false, queue.waitNamespaceExclusiveLock(procNs2, nsName2));
queue.wakeNamespaceExclusiveLock(procNs2, nsName2);
// add procNs2 back in the queue
queue.yield(procNs2);
// table on ns1 is locked, so we get table on ns2
procNs2 = queue.poll();
assertEquals(3, procNs2.getProcId());
assertEquals(true, queue.tryAcquireTableExclusiveLock(procNs2, tableName2));
assertEquals(false, queue.waitTableExclusiveLock(procNs2, tableName2));
// ns2 is not available (TODO we may avoid this one)
Procedure procNs2b = queue.poll();
assertEquals(4, procNs2b.getProcId());
assertEquals(false, queue.tryAcquireNamespaceExclusiveLock(procNs2b, nsName2));
queue.yield(procNs2b);
assertEquals(true, queue.waitNamespaceExclusiveLock(procNs2b, nsName2));
// release the ns1 lock
queue.releaseNamespaceExclusiveLock(procNs1, nsName1);
queue.wakeNamespaceExclusiveLock(procNs1, nsName1);
// we are now able to execute table of ns1
long procId = queue.poll().getProcId();
assertEquals(2, procId);
queue.releaseTableExclusiveLock(procNs2, tableName2);
// release ns2
queue.wakeTableExclusiveLock(procNs2, tableName2);
// we are now able to execute ns2
procId = queue.poll().getProcId();
@ -314,35 +315,18 @@ public class TestMasterProcedureScheduler {
// Fetch the ns item and take the xlock
Procedure proc = queue.poll();
assertEquals(1, proc.getProcId());
assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(proc, nsName));
assertEquals(false, queue.waitNamespaceExclusiveLock(proc, nsName));
// the table operation can't be executed because the ns is locked
assertEquals(null, queue.poll(0));
// release the ns lock
queue.releaseNamespaceExclusiveLock(proc, nsName);
queue.wakeNamespaceExclusiveLock(proc, nsName);
proc = queue.poll();
assertEquals(2, proc.getProcId());
assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName));
queue.releaseTableExclusiveLock(proc, tableName);
}
@Test
public void testSharedLock() throws Exception {
final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
final TableName tableName = TableName.valueOf("testtb");
TestTableProcedure procA =
new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.READ);
TestTableProcedure procB =
new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.READ);
assertTrue(queue.tryAcquireTableSharedLock(procA, tableName));
assertTrue(queue.tryAcquireTableSharedLock(procB, tableName));
queue.releaseTableSharedLock(procA, tableName);
queue.releaseTableSharedLock(procB, tableName);
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
queue.wakeTableExclusiveLock(proc, tableName);
}
@Test
@ -371,13 +355,13 @@ public class TestMasterProcedureScheduler {
// Fetch the 2nd item and take the xlock
proc = queue.poll();
assertEquals(2, proc.getProcId());
assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName));
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
// everything is locked by the table operation
assertEquals(null, queue.poll(0));
// release the table xlock
queue.releaseTableExclusiveLock(proc, tableName);
queue.wakeTableExclusiveLock(proc, tableName);
// grab the last item in the queue
proc = queue.poll();
@ -410,13 +394,13 @@ public class TestMasterProcedureScheduler {
// Fetch the 1st item and take the write lock
Procedure proc = queue.poll();
assertEquals(1, proc.getProcId());
assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName));
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
// everything is locked by the table operation
assertEquals(null, queue.poll(0));
// release the table lock
queue.releaseTableExclusiveLock(proc, tableName);
queue.wakeTableExclusiveLock(proc, tableName);
// Fetch the 2nd item and the the lock on regionA and regionB
Procedure mergeProc = queue.poll();
@ -475,7 +459,7 @@ public class TestMasterProcedureScheduler {
// Fetch the 1st item from the queue, "the root procedure" and take the table lock
Procedure rootProc = queue.poll();
assertEquals(1, rootProc.getProcId());
assertEquals(true, queue.tryAcquireTableExclusiveLock(rootProc, tableName));
assertEquals(false, queue.waitTableExclusiveLock(rootProc, tableName));
assertEquals(null, queue.poll(0));
// Execute the 1st step of the root-proc.
@ -519,7 +503,7 @@ public class TestMasterProcedureScheduler {
assertEquals(null, queue.poll(0));
// release the table lock (for the root procedure)
queue.releaseTableExclusiveLock(rootProc, tableName);
queue.wakeTableExclusiveLock(rootProc, tableName);
}
@Test
@ -639,7 +623,7 @@ public class TestMasterProcedureScheduler {
// fetch and acquire first xlock proc
Procedure parentProc = queue.poll();
assertEquals(rootProc, parentProc);
assertTrue(queue.tryAcquireTableExclusiveLock(parentProc, tableName));
assertEquals(false, queue.waitTableExclusiveLock(parentProc, tableName));
// add child procedure
for (int i = 0; i < childProcs.length; ++i) {
@ -662,13 +646,13 @@ public class TestMasterProcedureScheduler {
assertEquals(null, queue.poll(0));
// release xlock
queue.releaseTableExclusiveLock(parentProc, tableName);
queue.wakeTableExclusiveLock(parentProc, tableName);
// fetch the other xlock proc
Procedure proc = queue.poll();
assertEquals(100, proc.getProcId());
assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
queue.releaseTableExclusiveLock(proc, tableName);
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
queue.wakeTableExclusiveLock(proc, tableName);
}
@Test
@ -697,7 +681,7 @@ public class TestMasterProcedureScheduler {
// fetch and acquire first xlock proc
Procedure parentProc = queue.poll();
assertEquals(rootProc, parentProc);
assertTrue(queue.tryAcquireTableExclusiveLock(parentProc, tableName));
assertEquals(false, queue.waitTableExclusiveLock(parentProc, tableName));
// add child procedure
queue.addFront(childProc);
@ -705,11 +689,11 @@ public class TestMasterProcedureScheduler {
// fetch the other xlock proc
Procedure proc = queue.poll();
assertEquals(childProc, proc);
assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
queue.releaseTableExclusiveLock(proc, tableName);
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
queue.wakeTableExclusiveLock(proc, tableName);
// release xlock
queue.releaseTableExclusiveLock(parentProc, tableName);
queue.wakeTableExclusiveLock(parentProc, tableName);
}
@Test
@ -724,7 +708,7 @@ public class TestMasterProcedureScheduler {
// fetch from the queue and acquire xlock for the first proc
Procedure proc = queue.poll();
assertEquals(1, proc.getProcId());
assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName));
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
// nothing available, until xlock release
assertEquals(null, queue.poll(0));
@ -737,7 +721,7 @@ public class TestMasterProcedureScheduler {
assertEquals(1, proc.getProcId());
// release the xlock
queue.releaseTableExclusiveLock(proc, tableName);
queue.wakeTableExclusiveLock(proc, tableName);
proc = queue.poll();
assertEquals(2, proc.getProcId());
@ -757,12 +741,12 @@ public class TestMasterProcedureScheduler {
// fetch and acquire the first shared-lock
Procedure proc1 = queue.poll();
assertEquals(1, proc1.getProcId());
assertEquals(true, queue.tryAcquireTableSharedLock(proc1, tableName));
assertEquals(false, queue.waitTableSharedLock(proc1, tableName));
// fetch and acquire the second shared-lock
Procedure proc2 = queue.poll();
assertEquals(2, proc2.getProcId());
assertEquals(true, queue.tryAcquireTableSharedLock(proc2, tableName));
assertEquals(false, queue.waitTableSharedLock(proc2, tableName));
// nothing available, until xlock release
assertEquals(null, queue.poll(0));
@ -778,8 +762,8 @@ public class TestMasterProcedureScheduler {
assertEquals(2, proc2.getProcId());
// release the xlock
queue.releaseTableSharedLock(proc1, tableName);
queue.releaseTableSharedLock(proc2, tableName);
queue.wakeTableSharedLock(proc1, tableName);
queue.wakeTableSharedLock(proc2, tableName);
Procedure proc3 = queue.poll();
assertEquals(3, proc3.getProcId());

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@ -32,7 +31,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -68,60 +66,6 @@ public class TestMasterProcedureSchedulerConcurrency {
queue.clear();
}
@Test(timeout=60000)
public void testConcurrentCreateDelete() throws Exception {
final MasterProcedureScheduler procQueue = queue;
final TableName table = TableName.valueOf("testtb");
final AtomicBoolean running = new AtomicBoolean(true);
final AtomicBoolean failure = new AtomicBoolean(false);
Thread createThread = new Thread() {
@Override
public void run() {
try {
TestTableProcedure proc = new TestTableProcedure(1, table,
TableProcedureInterface.TableOperationType.CREATE);
while (running.get() && !failure.get()) {
if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
procQueue.releaseTableExclusiveLock(proc, table);
}
}
} catch (Throwable e) {
LOG.error("create failed", e);
failure.set(true);
}
}
};
Thread deleteThread = new Thread() {
@Override
public void run() {
try {
TestTableProcedure proc = new TestTableProcedure(2, table,
TableProcedureInterface.TableOperationType.DELETE);
while (running.get() && !failure.get()) {
if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
procQueue.releaseTableExclusiveLock(proc, table);
}
procQueue.markTableAsDeleted(table, proc);
}
} catch (Throwable e) {
LOG.error("delete failed", e);
failure.set(true);
}
}
};
createThread.start();
deleteThread.start();
for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) {
Thread.sleep(100);
}
running.set(false);
createThread.join();
deleteThread.join();
assertEquals(false, failure.get());
}
/**
* Verify that "write" operations for a single table are serialized,
* but different tables can be executed in parallel.
@ -237,26 +181,22 @@ public class TestMasterProcedureSchedulerConcurrency {
public Procedure acquire() {
Procedure proc = null;
boolean avail = false;
while (!avail) {
proc = queue.poll();
if (proc == null) break;
boolean waiting = true;
while (waiting && queue.size() > 0) {
proc = queue.poll(100000000L);
if (proc == null) continue;
switch (getTableOperationType(proc)) {
case CREATE:
case DELETE:
case EDIT:
avail = queue.tryAcquireTableExclusiveLock(proc, getTableName(proc));
waiting = queue.waitTableExclusiveLock(proc, getTableName(proc));
break;
case READ:
avail = queue.tryAcquireTableSharedLock(proc, getTableName(proc));
waiting = queue.waitTableSharedLock(proc, getTableName(proc));
break;
default:
throw new UnsupportedOperationException();
}
if (!avail) {
addFront(proc);
LOG.debug("yield procId=" + proc);
}
}
return proc;
}
@ -266,10 +206,10 @@ public class TestMasterProcedureSchedulerConcurrency {
case CREATE:
case DELETE:
case EDIT:
queue.releaseTableExclusiveLock(proc, getTableName(proc));
queue.wakeTableExclusiveLock(proc, getTableName(proc));
break;
case READ:
queue.releaseTableSharedLock(proc, getTableName(proc));
queue.wakeTableSharedLock(proc, getTableName(proc));
break;
}
}