HBASE-21375 Revisit the lock and queue implementation in MasterProcedureScheduler
This commit is contained in:
parent
6fbd70a611
commit
eb0f9e15d1
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Stream;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
@ -26,39 +27,36 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
* Locking for mutual exclusion between procedures. Used only by procedure framework internally.
|
||||
* {@link LockAndQueue} has two purposes:
|
||||
* <ol>
|
||||
* <li>Acquire/release exclusive/shared locks.</li>
|
||||
* <li>Maintains a list of procedures waiting on this lock.
|
||||
* {@link LockAndQueue} extends {@link ProcedureDeque} class. Blocked Procedures are added
|
||||
* to our super Deque. Using inheritance over composition to keep the Deque of waiting
|
||||
* Procedures is unusual, but we do it this way because in certain cases, there will be
|
||||
* millions of regions. This layout uses less memory.
|
||||
* <li>Acquire/release exclusive/shared locks.</li>
|
||||
* <li>Maintains a list of procedures waiting on this lock. {@link LockAndQueue} extends
|
||||
* {@link ProcedureDeque} class. Blocked Procedures are added to our super Deque. Using inheritance
|
||||
* over composition to keep the Deque of waiting Procedures is unusual, but we do it this way
|
||||
* because in certain cases, there will be millions of regions. This layout uses less memory.
|
||||
* </ol>
|
||||
*
|
||||
* <p>NOT thread-safe. Needs external concurrency control: e.g. uses in MasterProcedureScheduler are
|
||||
* guarded by schedLock().
|
||||
* <br>
|
||||
* <p/>
|
||||
* NOT thread-safe. Needs external concurrency control: e.g. uses in MasterProcedureScheduler are
|
||||
* guarded by schedLock(). <br/>
|
||||
* There is no need of 'volatile' keyword for member variables because of memory synchronization
|
||||
* guarantees of locks (see 'Memory Synchronization',
|
||||
* http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Lock.html)
|
||||
* <br>
|
||||
* We do not implement Lock interface because we need exclusive and shared locking, and also
|
||||
* because try-lock functions require procedure id.
|
||||
* <br>
|
||||
* http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/Lock.html) <br/>
|
||||
* We do not implement Lock interface because we need exclusive and shared locking, and also because
|
||||
* try-lock functions require procedure id. <br/>
|
||||
* We do not use ReentrantReadWriteLock directly because of its high memory overhead.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class LockAndQueue implements LockStatus {
|
||||
|
||||
private final Function<Long, Procedure<?>> procedureRetriever;
|
||||
private final ProcedureDeque queue = new ProcedureDeque();
|
||||
private Procedure<?> exclusiveLockOwnerProcedure = null;
|
||||
private int sharedLock = 0;
|
||||
|
||||
// ======================================================================
|
||||
// Lock Status
|
||||
// Lock Status
|
||||
// ======================================================================
|
||||
|
||||
@Override
|
||||
public boolean isLocked() {
|
||||
return hasExclusiveLock() || sharedLock > 0;
|
||||
public LockAndQueue(Function<Long, Procedure<?>> procedureRetriever) {
|
||||
this.procedureRetriever = procedureRetriever;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -66,22 +64,32 @@ public class LockAndQueue implements LockStatus {
|
|||
return this.exclusiveLockOwnerProcedure != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isLockOwner(long procId) {
|
||||
return getExclusiveLockProcIdOwner() == procId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasParentLock(Procedure<?> proc) {
|
||||
// TODO: need to check all the ancestors. need to passed in the procedures
|
||||
// to find the ancestors.
|
||||
return proc.hasParent() &&
|
||||
(isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasLockAccess(Procedure<?> proc) {
|
||||
return isLockOwner(proc.getProcId()) || hasParentLock(proc);
|
||||
if (exclusiveLockOwnerProcedure == null) {
|
||||
return false;
|
||||
}
|
||||
long lockOwnerId = exclusiveLockOwnerProcedure.getProcId();
|
||||
if (proc.getProcId() == lockOwnerId) {
|
||||
return true;
|
||||
}
|
||||
if (!proc.hasParent()) {
|
||||
return false;
|
||||
}
|
||||
// fast path to check root procedure
|
||||
if (proc.getRootProcId() == lockOwnerId) {
|
||||
return true;
|
||||
}
|
||||
// check ancestors
|
||||
for (Procedure<?> p = proc;;) {
|
||||
if (p.getParentProcId() == lockOwnerId) {
|
||||
return true;
|
||||
}
|
||||
p = procedureRetriever.apply(p.getParentProcId());
|
||||
if (p == null || !p.hasParent()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -89,22 +97,13 @@ public class LockAndQueue implements LockStatus {
|
|||
return exclusiveLockOwnerProcedure;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getExclusiveLockProcIdOwner() {
|
||||
if (exclusiveLockOwnerProcedure == null) {
|
||||
return Long.MIN_VALUE;
|
||||
} else {
|
||||
return exclusiveLockOwnerProcedure.getProcId();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSharedLockCount() {
|
||||
return sharedLock;
|
||||
}
|
||||
|
||||
// ======================================================================
|
||||
// try/release Shared/Exclusive lock
|
||||
// try/release Shared/Exclusive lock
|
||||
// ======================================================================
|
||||
|
||||
/**
|
||||
|
@ -143,7 +142,8 @@ public class LockAndQueue implements LockStatus {
|
|||
* @return whether we should wake the procedures waiting on the lock here.
|
||||
*/
|
||||
public boolean releaseExclusiveLock(Procedure<?> proc) {
|
||||
if (!isLockOwner(proc.getProcId())) {
|
||||
if (exclusiveLockOwnerProcedure == null ||
|
||||
exclusiveLockOwnerProcedure.getProcId() != proc.getProcId()) {
|
||||
// We are not the lock owner, it is probably inherited from the parent procedures.
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -21,25 +21,52 @@ package org.apache.hadoop.hbase.procedure2;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Interface to get status of a Lock without getting access to acquire/release lock.
|
||||
* Currently used in MasterProcedureScheduler where we want to give Queues access to lock's
|
||||
* status for scheduling purposes, but not the ability to acquire/release it.
|
||||
* Interface to get status of a Lock without getting access to acquire/release lock. Currently used
|
||||
* in MasterProcedureScheduler where we want to give Queues access to lock's status for scheduling
|
||||
* purposes, but not the ability to acquire/release it.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface LockStatus {
|
||||
boolean isLocked();
|
||||
|
||||
/**
|
||||
* Return whether this lock has already been held,
|
||||
* <p/>
|
||||
* Notice that, holding the exclusive lock or shared lock are both considered as locked, i.e, this
|
||||
* method usually equals to {@code hasExclusiveLock() || getSharedLockCount() > 0}.
|
||||
*/
|
||||
default boolean isLocked() {
|
||||
return hasExclusiveLock() || getSharedLockCount() > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether the exclusive lock has been held.
|
||||
*/
|
||||
boolean hasExclusiveLock();
|
||||
|
||||
boolean isLockOwner(long procId);
|
||||
|
||||
boolean hasParentLock(Procedure<?> proc);
|
||||
|
||||
/**
|
||||
* Return true if the procedure itself holds the exclusive lock, or any ancestors of the give
|
||||
* procedure hold the exclusive lock.
|
||||
*/
|
||||
boolean hasLockAccess(Procedure<?> proc);
|
||||
|
||||
/**
|
||||
* Get the procedure which holds the exclusive lock.
|
||||
*/
|
||||
Procedure<?> getExclusiveLockOwnerProcedure();
|
||||
|
||||
long getExclusiveLockProcIdOwner();
|
||||
/**
|
||||
* Return the id of the procedure which holds the exclusive lock, if exists. Or a negative value
|
||||
* which means no one holds the exclusive lock.
|
||||
* <p/>
|
||||
* Notice that, in HBase, we assume that the procedure id is positive, or at least non-negative.
|
||||
*/
|
||||
default long getExclusiveLockProcIdOwner() {
|
||||
Procedure<?> proc = getExclusiveLockOwnerProcedure();
|
||||
return proc != null ? proc.getProcId() : -1L;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of procedures which hold the shared lock.
|
||||
*/
|
||||
int getSharedLockCount();
|
||||
}
|
||||
|
|
|
@ -329,7 +329,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
|||
* @see #holdLock(Object)
|
||||
* @return true if the procedure has the lock, false otherwise.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public final boolean hasLock() {
|
||||
return locked;
|
||||
}
|
||||
|
@ -714,12 +713,20 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
|||
/**
|
||||
* Will only be called when loading procedures from procedure store, where we need to record
|
||||
* whether the procedure has already held a lock. Later we will call
|
||||
* {@link #doAcquireLock(Object, ProcedureStore)} to actually acquire the lock.
|
||||
* {@link #restoreLock(Object, ProcedureStore)} to actually acquire the lock.
|
||||
*/
|
||||
final void lockedWhenLoading() {
|
||||
this.lockedWhenLoading = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Can only be called when restarting, before the procedure actually being executed, as after we
|
||||
* actually call the {@link #doAcquireLock(Object, ProcedureStore)} method, we will reset
|
||||
* {@link #lockedWhenLoading} to false.
|
||||
* <p/>
|
||||
* Now it is only used in the ProcedureScheduler to determine whether we should put a Procedure in
|
||||
* front of a queue.
|
||||
*/
|
||||
public boolean isLockedWhenLoading() {
|
||||
return lockedWhenLoading;
|
||||
}
|
||||
|
@ -993,7 +1000,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure<TE
|
|||
// this can happen if the parent stores the sub procedures but before it can
|
||||
// release its lock, the master restarts
|
||||
if (getState() == ProcedureState.WAITING && !holdLock(env)) {
|
||||
LOG.debug("{} is in WAITING STATE, and holdLock= false, skip acquiring lock.", this);
|
||||
LOG.debug("{} is in WAITING STATE, and holdLock=false, skip acquiring lock.", this);
|
||||
lockedWhenLoading = false;
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -637,14 +637,15 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
proc.afterReplay(getEnvironment());
|
||||
}
|
||||
});
|
||||
// 4. restore locks
|
||||
restoreLocks();
|
||||
|
||||
// 4. Push the procedures to the timeout executor
|
||||
// 5. Push the procedures to the timeout executor
|
||||
waitingTimeoutList.forEach(proc -> {
|
||||
proc.afterReplay(getEnvironment());
|
||||
timeoutExecutor.add(proc);
|
||||
});
|
||||
// 5. restore locks
|
||||
restoreLocks();
|
||||
|
||||
// 6. Push the procedure to the scheduler
|
||||
failedList.forEach(scheduler::addBack);
|
||||
runnableList.forEach(p -> {
|
||||
|
@ -652,26 +653,7 @@ public class ProcedureExecutor<TEnvironment> {
|
|||
if (!p.hasParent()) {
|
||||
sendProcedureLoadedNotification(p.getProcId());
|
||||
}
|
||||
// If the procedure holds the lock, put the procedure in front
|
||||
// If its parent holds the lock, put the procedure in front
|
||||
// TODO. Is that possible that its ancestor holds the lock?
|
||||
// For now, the deepest procedure hierarchy is:
|
||||
// ModifyTableProcedure -> ReopenTableProcedure ->
|
||||
// MoveTableProcedure -> Unassign/AssignProcedure
|
||||
// But ModifyTableProcedure and ReopenTableProcedure won't hold the lock
|
||||
// So, check parent lock is enough(a tricky case is resovled by HBASE-21384).
|
||||
// If some one change or add new procedures making 'grandpa' procedure
|
||||
// holds the lock, but parent procedure don't hold the lock, there will
|
||||
// be a problem here. We have to check one procedure's ancestors.
|
||||
// And we need to change LockAndQueue.hasParentLock(Procedure<?> proc) method
|
||||
// to check all ancestors too.
|
||||
if (p.isLockedWhenLoading() || (p.hasParent() && procedures
|
||||
.get(p.getParentProcId()).isLockedWhenLoading())) {
|
||||
scheduler.addFront(p, false);
|
||||
} else {
|
||||
// if it was not, it can wait.
|
||||
scheduler.addBack(p, false);
|
||||
}
|
||||
scheduler.addBack(p);
|
||||
});
|
||||
// After all procedures put into the queue, signal the worker threads.
|
||||
// Otherwise, there is a race condition. See HBASE-21364.
|
||||
|
|
|
@ -403,7 +403,7 @@ public class ProcedureTestingUtility {
|
|||
public NoopProcedure() {}
|
||||
|
||||
@Override
|
||||
protected Procedure[] execute(TEnv env)
|
||||
protected Procedure<TEnv>[] execute(TEnv env)
|
||||
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||
return null;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.procedure2;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category({ MasterTests.class, SmallTests.class })
|
||||
public class TestLockAndQueue {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestLockAndQueue.class);
|
||||
|
||||
@Test
|
||||
public void testHasLockAccess() {
|
||||
Map<Long, NoopProcedure<Void>> procMap = new HashMap<>();
|
||||
for (long i = 1; i <= 10; i++) {
|
||||
NoopProcedure<Void> proc = new NoopProcedure<>();
|
||||
proc.setProcId(i);
|
||||
if (i > 1) {
|
||||
proc.setParentProcId(i - 1);
|
||||
proc.setRootProcId(1);
|
||||
}
|
||||
procMap.put(i, proc);
|
||||
}
|
||||
LockAndQueue laq = new LockAndQueue(procMap::get);
|
||||
for (long i = 1; i <= 10; i++) {
|
||||
assertFalse(laq.hasLockAccess(procMap.get(i)));
|
||||
}
|
||||
for (long i = 1; i <= 10; i++) {
|
||||
NoopProcedure<Void> procHasLock = procMap.get(i);
|
||||
laq.tryExclusiveLock(procHasLock);
|
||||
for (long j = 1; j < i; j++) {
|
||||
assertFalse(laq.hasLockAccess(procMap.get(j)));
|
||||
}
|
||||
for (long j = i; j <= 10; j++) {
|
||||
assertTrue(laq.hasLockAccess(procMap.get(j)));
|
||||
}
|
||||
laq.releaseExclusiveLock(procHasLock);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -82,7 +82,8 @@ public class MasterProcedureEnv implements ConfigurationObserver {
|
|||
public MasterProcedureEnv(final MasterServices master,
|
||||
final RSProcedureDispatcher remoteDispatcher) {
|
||||
this.master = master;
|
||||
this.procSched = new MasterProcedureScheduler();
|
||||
this.procSched = new MasterProcedureScheduler(
|
||||
procId -> master.getMasterProcedureExecutor().getProcedure(procId));
|
||||
this.remoteDispatcher = remoteDispatcher;
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.procedure;
|
|||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableExistsException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -29,6 +30,7 @@ import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOpera
|
|||
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
|
||||
import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
|
||||
import org.apache.hadoop.hbase.procedure2.LockAndQueue;
|
||||
import org.apache.hadoop.hbase.procedure2.LockStatus;
|
||||
import org.apache.hadoop.hbase.procedure2.LockedResource;
|
||||
import org.apache.hadoop.hbase.procedure2.LockedResourceType;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
|
@ -114,11 +116,15 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
private PeerQueue peerMap = null;
|
||||
private MetaQueue metaMap = null;
|
||||
|
||||
private final SchemaLocking locking = new SchemaLocking();
|
||||
private final SchemaLocking locking;
|
||||
|
||||
public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) {
|
||||
locking = new SchemaLocking(procedureRetriever);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void yield(final Procedure proc) {
|
||||
push(proc, isTableProcedure(proc), true);
|
||||
push(proc, false, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -142,18 +148,17 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
private <T extends Comparable<T>> void doAdd(final FairQueue<T> fairq,
|
||||
final Queue<T> queue, final Procedure<?> proc, final boolean addFront) {
|
||||
if (!queue.getLockStatus().hasExclusiveLock()) {
|
||||
// if the queue was not remove for an xlock execution,put the queue back into execution
|
||||
queue.add(proc, addFront);
|
||||
private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,
|
||||
Procedure<?> proc, boolean addFront) {
|
||||
queue.add(proc, addFront);
|
||||
// For the following conditions, we will put the queue back into execution
|
||||
// 1. The procedure has already held the lock, or the lock has been restored when restarting,
|
||||
// which means it can be executed immediately.
|
||||
// 2. The exclusive lock for this queue has not been held.
|
||||
// 3. The given procedure has the exclusive lock permission for this queue.
|
||||
if (proc.hasLock() || proc.isLockedWhenLoading() || !queue.getLockStatus().hasExclusiveLock() ||
|
||||
queue.getLockStatus().hasLockAccess(proc)) {
|
||||
addToRunQueue(fairq, queue);
|
||||
} else if (queue.getLockStatus().hasLockAccess(proc)) {
|
||||
// always add it to front as the have the lock access.
|
||||
queue.add(proc, true);
|
||||
addToRunQueue(fairq, queue);
|
||||
} else {
|
||||
queue.add(proc, addFront);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -182,38 +187,40 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
return pollResult;
|
||||
}
|
||||
|
||||
private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) {
|
||||
LockStatus s = rq.getLockStatus();
|
||||
// if we have the lock access, we are ready
|
||||
if (s.hasLockAccess(proc)) {
|
||||
return true;
|
||||
}
|
||||
boolean xlockReq = rq.requireExclusiveLock(proc);
|
||||
// if we need to hold the xlock, then we need to make sure that no one holds any lock, including
|
||||
// the shared lock, otherwise, we just need to make sure that no one holds the xlock
|
||||
return xlockReq ? !s.isLocked() : !s.hasExclusiveLock();
|
||||
}
|
||||
|
||||
private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) {
|
||||
final Queue<T> rq = fairq.poll();
|
||||
Queue<T> rq = fairq.poll();
|
||||
if (rq == null || !rq.isAvailable()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final Procedure<?> pollResult = rq.peek();
|
||||
if (pollResult == null) {
|
||||
return null;
|
||||
}
|
||||
final boolean xlockReq = rq.requireExclusiveLock(pollResult);
|
||||
if (xlockReq && rq.getLockStatus().isLocked() && !rq.getLockStatus().hasLockAccess(pollResult)) {
|
||||
// someone is already holding the lock (e.g. shared lock). avoid a yield
|
||||
removeFromRunQueue(fairq, rq);
|
||||
return null;
|
||||
}
|
||||
|
||||
rq.poll();
|
||||
if (rq.isEmpty() || xlockReq) {
|
||||
removeFromRunQueue(fairq, rq);
|
||||
} else if (rq.getLockStatus().hasParentLock(pollResult)) {
|
||||
// if the rq is in the fairq because of runnable child
|
||||
// check if the next procedure is still a child.
|
||||
// if not, remove the rq from the fairq and go back to the xlock state
|
||||
Procedure<?> nextProc = rq.peek();
|
||||
if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult) &&
|
||||
nextProc.getRootProcId() != pollResult.getRootProcId()) {
|
||||
removeFromRunQueue(fairq, rq);
|
||||
// loop until we find out a procedure which is ready to run, or if we have checked all the
|
||||
// procedures, then we give up and remove the queue from run queue.
|
||||
for (int i = 0, n = rq.size(); i < n; i++) {
|
||||
Procedure<?> proc = rq.poll();
|
||||
if (isLockReady(proc, rq)) {
|
||||
// the queue is empty, remove from run queue
|
||||
if (rq.isEmpty()) {
|
||||
removeFromRunQueue(fairq, rq);
|
||||
}
|
||||
return proc;
|
||||
}
|
||||
// we are not ready to run, add back and try the next procedure
|
||||
rq.add(proc, false);
|
||||
}
|
||||
|
||||
return pollResult;
|
||||
// no procedure is ready for execution, remove from run queue
|
||||
removeFromRunQueue(fairq, rq);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -60,21 +60,8 @@ abstract class Queue<TKey extends Comparable<TKey>> extends AvlLinkedNode<Queue<
|
|||
return lockStatus;
|
||||
}
|
||||
|
||||
// This should go away when we have the new AM and its events
|
||||
// and we move xlock to the lock-event-queue.
|
||||
public boolean isAvailable() {
|
||||
if (isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
if (getLockStatus().hasExclusiveLock()) {
|
||||
// If we have an exclusive lock already taken, only child of the lock owner can be executed
|
||||
// And now we will restore locks when master restarts, so it is possible that the procedure
|
||||
// which is holding the lock is also in the queue, so we need to use hasLockAccess here
|
||||
// instead of hasParentLock
|
||||
Procedure<?> nextProc = peek();
|
||||
return nextProc != null && getLockStatus().hasLockAccess(nextProc);
|
||||
}
|
||||
return true;
|
||||
return !isEmpty();
|
||||
}
|
||||
|
||||
// ======================================================================
|
||||
|
|
|
@ -45,18 +45,25 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
class SchemaLocking {
|
||||
|
||||
private final Function<Long, Procedure<?>> procedureRetriever;
|
||||
private final Map<ServerName, LockAndQueue> serverLocks = new HashMap<>();
|
||||
private final Map<String, LockAndQueue> namespaceLocks = new HashMap<>();
|
||||
private final Map<TableName, LockAndQueue> tableLocks = new HashMap<>();
|
||||
// Single map for all regions irrespective of tables. Key is encoded region name.
|
||||
private final Map<String, LockAndQueue> regionLocks = new HashMap<>();
|
||||
private final Map<String, LockAndQueue> peerLocks = new HashMap<>();
|
||||
private final LockAndQueue metaLock = new LockAndQueue();
|
||||
private final LockAndQueue metaLock;
|
||||
|
||||
public SchemaLocking(Function<Long, Procedure<?>> procedureRetriever) {
|
||||
this.procedureRetriever = procedureRetriever;
|
||||
this.metaLock = new LockAndQueue(procedureRetriever);
|
||||
}
|
||||
|
||||
private <T> LockAndQueue getLock(Map<T, LockAndQueue> map, T key) {
|
||||
LockAndQueue lock = map.get(key);
|
||||
if (lock == null) {
|
||||
lock = new LockAndQueue();
|
||||
lock = new LockAndQueue(procedureRetriever);
|
||||
map.put(key, lock);
|
||||
}
|
||||
return lock;
|
||||
|
|
|
@ -34,21 +34,7 @@ class TableQueue extends Queue<TableName> {
|
|||
|
||||
@Override
|
||||
public boolean isAvailable() {
|
||||
// if there are no items in the queue, or the namespace is locked.
|
||||
// we can't execute operation on this table
|
||||
if (isEmpty() || namespaceLockStatus.hasExclusiveLock()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (getLockStatus().hasExclusiveLock()) {
|
||||
// if we have an exclusive lock already taken
|
||||
// only child of the lock owner can be executed
|
||||
final Procedure<?> nextProc = peek();
|
||||
return nextProc != null && getLockStatus().hasLockAccess(nextProc);
|
||||
}
|
||||
|
||||
// no xlock
|
||||
return true;
|
||||
return !isEmpty() && !namespaceLockStatus.hasExclusiveLock();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -256,7 +256,7 @@ public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBase
|
|||
|
||||
@Override
|
||||
protected int doWork() throws Exception {
|
||||
procedureScheduler = new MasterProcedureScheduler();
|
||||
procedureScheduler = new MasterProcedureScheduler(pid -> null);
|
||||
procedureScheduler.start();
|
||||
setupOperations();
|
||||
|
||||
|
|
|
@ -29,7 +29,6 @@ import java.lang.reflect.Method;
|
|||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
|
@ -55,12 +54,12 @@ import org.junit.rules.TestName;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@Category({MasterTests.class, SmallTests.class})
|
||||
@Category({ MasterTests.class, SmallTests.class })
|
||||
public class TestMasterProcedureScheduler {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestMasterProcedureScheduler.class);
|
||||
HBaseClassTestRule.forClass(TestMasterProcedureScheduler.class);
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestMasterProcedureScheduler.class);
|
||||
|
||||
|
@ -71,7 +70,7 @@ public class TestMasterProcedureScheduler {
|
|||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
queue = new MasterProcedureScheduler();
|
||||
queue = new MasterProcedureScheduler(pid -> null);
|
||||
queue.start();
|
||||
}
|
||||
|
||||
|
@ -104,9 +103,9 @@ public class TestMasterProcedureScheduler {
|
|||
|
||||
for (int j = 1; j <= NUM_ITEMS; ++j) {
|
||||
for (int i = 1; i <= NUM_TABLES; ++i) {
|
||||
Procedure proc = queue.poll();
|
||||
Procedure<?> proc = queue.poll();
|
||||
assertTrue(proc != null);
|
||||
TableName tableName = ((TestTableProcedure)proc).getTableName();
|
||||
TableName tableName = ((TestTableProcedure) proc).getTableName();
|
||||
queue.waitTableExclusiveLock(proc, tableName);
|
||||
queue.wakeTableExclusiveLock(proc, tableName);
|
||||
queue.completionCleanup(proc);
|
||||
|
@ -118,32 +117,32 @@ public class TestMasterProcedureScheduler {
|
|||
|
||||
for (int i = 1; i <= NUM_TABLES; ++i) {
|
||||
final TableName tableName = TableName.valueOf(String.format("test-%04d", i));
|
||||
final TestTableProcedure dummyProc = new TestTableProcedure(100, tableName,
|
||||
TableProcedureInterface.TableOperationType.DELETE);
|
||||
final TestTableProcedure dummyProc =
|
||||
new TestTableProcedure(100, tableName, TableProcedureInterface.TableOperationType.DELETE);
|
||||
// complete the table deletion
|
||||
assertTrue(queue.markTableAsDeleted(tableName, dummyProc));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that the table queue is not deletable until every procedure
|
||||
* in-progress is completed (this is a special case for write-locks).
|
||||
* Check that the table queue is not deletable until every procedure in-progress is completed
|
||||
* (this is a special case for write-locks).
|
||||
*/
|
||||
@Test
|
||||
public void testCreateDeleteTableOperationsWithWriteLock() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
|
||||
final TestTableProcedure dummyProc = new TestTableProcedure(100, tableName,
|
||||
TableProcedureInterface.TableOperationType.DELETE);
|
||||
final TestTableProcedure dummyProc =
|
||||
new TestTableProcedure(100, tableName, TableProcedureInterface.TableOperationType.DELETE);
|
||||
|
||||
queue.addBack(new TestTableProcedure(1, tableName,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.EDIT));
|
||||
|
||||
// table can't be deleted because one item is in the queue
|
||||
assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
|
||||
|
||||
// fetch item and take a lock
|
||||
Procedure proc = queue.poll();
|
||||
Procedure<?> proc = queue.poll();
|
||||
assertEquals(1, proc.getProcId());
|
||||
// take the xlock
|
||||
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
|
||||
|
@ -157,29 +156,30 @@ public class TestMasterProcedureScheduler {
|
|||
}
|
||||
|
||||
/**
|
||||
* Check that the table queue is not deletable until every procedure
|
||||
* in-progress is completed (this is a special case for read-locks).
|
||||
* Check that the table queue is not deletable until every procedure in-progress is completed
|
||||
* (this is a special case for read-locks).
|
||||
*/
|
||||
@Test
|
||||
public void testCreateDeleteTableOperationsWithReadLock() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
final int nitems = 2;
|
||||
|
||||
final TestTableProcedure dummyProc = new TestTableProcedure(100, tableName,
|
||||
TableProcedureInterface.TableOperationType.DELETE);
|
||||
final TestTableProcedure dummyProc =
|
||||
new TestTableProcedure(100, tableName, TableProcedureInterface.TableOperationType.DELETE);
|
||||
|
||||
for (int i = 1; i <= nitems; ++i) {
|
||||
queue.addBack(new TestTableProcedure(i, tableName,
|
||||
TableProcedureInterface.TableOperationType.READ));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(i, tableName, TableProcedureInterface.TableOperationType.READ));
|
||||
}
|
||||
|
||||
// table can't be deleted because one item is in the queue
|
||||
assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
|
||||
|
||||
Procedure[] procs = new Procedure[nitems];
|
||||
Procedure<?>[] procs = new Procedure[nitems];
|
||||
for (int i = 0; i < nitems; ++i) {
|
||||
// fetch item and take a lock
|
||||
Procedure proc = procs[i] = queue.poll();
|
||||
Procedure<?> proc = queue.poll();
|
||||
procs[i] = proc;
|
||||
assertEquals(i + 1, proc.getProcId());
|
||||
// take the rlock
|
||||
assertEquals(false, queue.waitTableSharedLock(proc, tableName));
|
||||
|
@ -206,19 +206,15 @@ public class TestMasterProcedureScheduler {
|
|||
@Test
|
||||
public void testVerifyRwLocks() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
queue.addBack(new TestTableProcedure(1, tableName,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(new TestTableProcedure(2, tableName,
|
||||
TableProcedureInterface.TableOperationType.READ));
|
||||
queue.addBack(new TestTableProcedure(3, tableName,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(new TestTableProcedure(4, tableName,
|
||||
TableProcedureInterface.TableOperationType.READ));
|
||||
queue.addBack(new TestTableProcedure(5, tableName,
|
||||
TableProcedureInterface.TableOperationType.READ));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.READ));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(3, tableName, TableProcedureInterface.TableOperationType.EDIT));
|
||||
|
||||
// Fetch the 1st item and take the write lock
|
||||
Procedure proc = queue.poll();
|
||||
Procedure<?> proc = queue.poll();
|
||||
assertEquals(1, proc.getProcId());
|
||||
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
|
||||
|
||||
|
@ -229,7 +225,7 @@ public class TestMasterProcedureScheduler {
|
|||
queue.wakeTableExclusiveLock(proc, tableName);
|
||||
|
||||
// Fetch the 2nd item and take the read lock
|
||||
Procedure rdProc = queue.poll();
|
||||
Procedure<?> rdProc = queue.poll();
|
||||
assertEquals(2, rdProc.getProcId());
|
||||
assertEquals(false, queue.waitTableSharedLock(rdProc, tableName));
|
||||
|
||||
|
@ -239,8 +235,13 @@ public class TestMasterProcedureScheduler {
|
|||
// release the rdlock of item 2 and take the wrlock for the 3d item
|
||||
queue.wakeTableSharedLock(rdProc, tableName);
|
||||
|
||||
queue.addBack(
|
||||
new TestTableProcedure(4, tableName, TableProcedureInterface.TableOperationType.READ));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(5, tableName, TableProcedureInterface.TableOperationType.READ));
|
||||
|
||||
// Fetch the 3rd item and take the write lock
|
||||
Procedure wrProc = queue.poll();
|
||||
Procedure<?> wrProc = queue.poll();
|
||||
assertEquals(false, queue.waitTableExclusiveLock(wrProc, tableName));
|
||||
|
||||
// Fetch 4th item and verify that the lock can't be acquired
|
||||
|
@ -255,7 +256,7 @@ public class TestMasterProcedureScheduler {
|
|||
assertEquals(false, queue.waitTableSharedLock(rdProc, tableName));
|
||||
|
||||
// Fetch the 4th item and take the read lock
|
||||
Procedure rdProc2 = queue.poll();
|
||||
Procedure<?> rdProc2 = queue.poll();
|
||||
assertEquals(5, rdProc2.getProcId());
|
||||
assertEquals(false, queue.waitTableSharedLock(rdProc2, tableName));
|
||||
|
||||
|
@ -274,22 +275,22 @@ public class TestMasterProcedureScheduler {
|
|||
String nsName2 = "ns2";
|
||||
TableName tableName1 = TableName.valueOf(nsName1, name.getMethodName());
|
||||
TableName tableName2 = TableName.valueOf(nsName2, name.getMethodName());
|
||||
queue.addBack(new TestNamespaceProcedure(1, nsName1,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(new TestTableProcedure(2, tableName1,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(new TestTableProcedure(3, tableName2,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(new TestNamespaceProcedure(4, nsName2,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(
|
||||
new TestNamespaceProcedure(1, nsName1, TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(2, tableName1, TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(3, tableName2, TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(
|
||||
new TestNamespaceProcedure(4, nsName2, TableProcedureInterface.TableOperationType.EDIT));
|
||||
|
||||
// Fetch the 1st item and take the write lock
|
||||
Procedure procNs1 = queue.poll();
|
||||
Procedure<?> procNs1 = queue.poll();
|
||||
assertEquals(1, procNs1.getProcId());
|
||||
assertFalse(queue.waitNamespaceExclusiveLock(procNs1, nsName1));
|
||||
|
||||
// namespace table has higher priority so we still return procedure for it
|
||||
Procedure procNs2 = queue.poll();
|
||||
Procedure<?> procNs2 = queue.poll();
|
||||
assertEquals(4, procNs2.getProcId());
|
||||
assertFalse(queue.waitNamespaceExclusiveLock(procNs2, nsName2));
|
||||
queue.wakeNamespaceExclusiveLock(procNs2, nsName2);
|
||||
|
@ -324,13 +325,13 @@ public class TestMasterProcedureScheduler {
|
|||
public void testVerifyNamespaceXLock() throws Exception {
|
||||
String nsName = "ns1";
|
||||
TableName tableName = TableName.valueOf(nsName, name.getMethodName());
|
||||
queue.addBack(new TestNamespaceProcedure(1, nsName,
|
||||
TableProcedureInterface.TableOperationType.CREATE));
|
||||
queue.addBack(new TestTableProcedure(2, tableName,
|
||||
TableProcedureInterface.TableOperationType.READ));
|
||||
queue.addBack(
|
||||
new TestNamespaceProcedure(1, nsName, TableProcedureInterface.TableOperationType.CREATE));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.READ));
|
||||
|
||||
// Fetch the ns item and take the xlock
|
||||
Procedure proc = queue.poll();
|
||||
Procedure<?> proc = queue.poll();
|
||||
assertEquals(1, proc.getProcId());
|
||||
assertEquals(false, queue.waitNamespaceExclusiveLock(proc, nsName));
|
||||
|
||||
|
@ -349,17 +350,16 @@ public class TestMasterProcedureScheduler {
|
|||
@Test
|
||||
public void testXLockWaitingForExecutingSharedLockToRelease() {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
final HRegionInfo regionA = new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b"));
|
||||
final RegionInfo regionA = RegionInfoBuilder.newBuilder(tableName)
|
||||
.setStartKey(Bytes.toBytes("a")).setEndKey(Bytes.toBytes("b")).build();
|
||||
|
||||
queue.addBack(new TestRegionProcedure(1, tableName,
|
||||
TableProcedureInterface.TableOperationType.REGION_ASSIGN, regionA));
|
||||
queue.addBack(new TestTableProcedure(2, tableName,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(new TestRegionProcedure(3, tableName,
|
||||
TableProcedureInterface.TableOperationType.REGION_UNASSIGN, regionA));
|
||||
TableProcedureInterface.TableOperationType.REGION_ASSIGN, regionA));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.EDIT));
|
||||
|
||||
// Fetch the 1st item and take the shared lock
|
||||
Procedure proc = queue.poll();
|
||||
Procedure<?> proc = queue.poll();
|
||||
assertEquals(1, proc.getProcId());
|
||||
assertEquals(false, queue.waitRegion(proc, regionA));
|
||||
|
||||
|
@ -374,6 +374,9 @@ public class TestMasterProcedureScheduler {
|
|||
assertEquals(2, proc.getProcId());
|
||||
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
|
||||
|
||||
queue.addBack(new TestRegionProcedure(3, tableName,
|
||||
TableProcedureInterface.TableOperationType.REGION_UNASSIGN, regionA));
|
||||
|
||||
// everything is locked by the table operation
|
||||
assertEquals(null, queue.poll(0));
|
||||
|
||||
|
@ -393,23 +396,26 @@ public class TestMasterProcedureScheduler {
|
|||
@Test
|
||||
public void testVerifyRegionLocks() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
final HRegionInfo regionA = new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b"));
|
||||
final HRegionInfo regionB = new HRegionInfo(tableName, Bytes.toBytes("b"), Bytes.toBytes("c"));
|
||||
final HRegionInfo regionC = new HRegionInfo(tableName, Bytes.toBytes("c"), Bytes.toBytes("d"));
|
||||
final RegionInfo regionA = RegionInfoBuilder.newBuilder(tableName)
|
||||
.setStartKey(Bytes.toBytes("a")).setEndKey(Bytes.toBytes("b")).build();
|
||||
final RegionInfo regionB = RegionInfoBuilder.newBuilder(tableName)
|
||||
.setStartKey(Bytes.toBytes("b")).setEndKey(Bytes.toBytes("c")).build();
|
||||
final RegionInfo regionC = RegionInfoBuilder.newBuilder(tableName)
|
||||
.setStartKey(Bytes.toBytes("c")).setEndKey(Bytes.toBytes("d")).build();
|
||||
|
||||
queue.addBack(new TestTableProcedure(1, tableName,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(new TestRegionProcedure(2, tableName,
|
||||
TableProcedureInterface.TableOperationType.REGION_MERGE, regionA, regionB));
|
||||
TableProcedureInterface.TableOperationType.REGION_MERGE, regionA, regionB));
|
||||
queue.addBack(new TestRegionProcedure(3, tableName,
|
||||
TableProcedureInterface.TableOperationType.REGION_SPLIT, regionA));
|
||||
TableProcedureInterface.TableOperationType.REGION_SPLIT, regionA));
|
||||
queue.addBack(new TestRegionProcedure(4, tableName,
|
||||
TableProcedureInterface.TableOperationType.REGION_SPLIT, regionB));
|
||||
TableProcedureInterface.TableOperationType.REGION_SPLIT, regionB));
|
||||
queue.addBack(new TestRegionProcedure(5, tableName,
|
||||
TableProcedureInterface.TableOperationType.REGION_UNASSIGN, regionC));
|
||||
TableProcedureInterface.TableOperationType.REGION_UNASSIGN, regionC));
|
||||
|
||||
// Fetch the 1st item and take the write lock
|
||||
Procedure proc = queue.poll();
|
||||
Procedure<?> proc = queue.poll();
|
||||
assertEquals(1, proc.getProcId());
|
||||
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
|
||||
|
||||
|
@ -420,24 +426,24 @@ public class TestMasterProcedureScheduler {
|
|||
queue.wakeTableExclusiveLock(proc, tableName);
|
||||
|
||||
// Fetch the 2nd item and the the lock on regionA and regionB
|
||||
Procedure mergeProc = queue.poll();
|
||||
Procedure<?> mergeProc = queue.poll();
|
||||
assertEquals(2, mergeProc.getProcId());
|
||||
assertEquals(false, queue.waitRegions(mergeProc, tableName, regionA, regionB));
|
||||
|
||||
// Fetch the 3rd item and the try to lock region A which will fail
|
||||
// because already locked. this procedure will go in waiting.
|
||||
// (this stuff will be explicit until we get rid of the zk-lock)
|
||||
Procedure procA = queue.poll();
|
||||
Procedure<?> procA = queue.poll();
|
||||
assertEquals(3, procA.getProcId());
|
||||
assertEquals(true, queue.waitRegions(procA, tableName, regionA));
|
||||
|
||||
// Fetch the 4th item, same story as the 3rd
|
||||
Procedure procB = queue.poll();
|
||||
Procedure<?> procB = queue.poll();
|
||||
assertEquals(4, procB.getProcId());
|
||||
assertEquals(true, queue.waitRegions(procB, tableName, regionB));
|
||||
|
||||
// Fetch the 5th item, since it is a non-locked region we are able to execute it
|
||||
Procedure procC = queue.poll();
|
||||
Procedure<?> procC = queue.poll();
|
||||
assertEquals(5, procC.getProcId());
|
||||
assertEquals(false, queue.waitRegions(procC, tableName, regionC));
|
||||
|
||||
|
@ -466,15 +472,18 @@ public class TestMasterProcedureScheduler {
|
|||
@Test
|
||||
public void testVerifySubProcRegionLocks() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
final HRegionInfo regionA = new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b"));
|
||||
final HRegionInfo regionB = new HRegionInfo(tableName, Bytes.toBytes("b"), Bytes.toBytes("c"));
|
||||
final HRegionInfo regionC = new HRegionInfo(tableName, Bytes.toBytes("c"), Bytes.toBytes("d"));
|
||||
final RegionInfo regionA = RegionInfoBuilder.newBuilder(tableName)
|
||||
.setStartKey(Bytes.toBytes("a")).setEndKey(Bytes.toBytes("b")).build();
|
||||
final RegionInfo regionB = RegionInfoBuilder.newBuilder(tableName)
|
||||
.setStartKey(Bytes.toBytes("b")).setEndKey(Bytes.toBytes("c")).build();
|
||||
final RegionInfo regionC = RegionInfoBuilder.newBuilder(tableName)
|
||||
.setStartKey(Bytes.toBytes("c")).setEndKey(Bytes.toBytes("d")).build();
|
||||
|
||||
queue.addBack(new TestTableProcedure(1, tableName,
|
||||
TableProcedureInterface.TableOperationType.ENABLE));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.ENABLE));
|
||||
|
||||
// Fetch the 1st item from the queue, "the root procedure" and take the table lock
|
||||
Procedure rootProc = queue.poll();
|
||||
Procedure<?> rootProc = queue.poll();
|
||||
assertEquals(1, rootProc.getProcId());
|
||||
assertEquals(false, queue.waitTableExclusiveLock(rootProc, tableName));
|
||||
assertEquals(null, queue.poll(0));
|
||||
|
@ -482,14 +491,13 @@ public class TestMasterProcedureScheduler {
|
|||
// Execute the 1st step of the root-proc.
|
||||
// we should get 3 sub-proc back, one for each region.
|
||||
// (this step is done by the executor/rootProc, we are simulating it)
|
||||
Procedure[] subProcs = new Procedure[] {
|
||||
Procedure<?>[] subProcs = new Procedure[] {
|
||||
new TestRegionProcedure(1, 2, tableName,
|
||||
TableProcedureInterface.TableOperationType.REGION_EDIT, regionA),
|
||||
new TestRegionProcedure(1, 3, tableName,
|
||||
TableProcedureInterface.TableOperationType.REGION_EDIT, regionB),
|
||||
new TestRegionProcedure(1, 4, tableName,
|
||||
TableProcedureInterface.TableOperationType.REGION_EDIT, regionC),
|
||||
};
|
||||
TableProcedureInterface.TableOperationType.REGION_EDIT, regionC), };
|
||||
|
||||
// at this point the rootProc is going in a waiting state
|
||||
// and the sub-procedures will be added in the queue.
|
||||
|
@ -502,7 +510,7 @@ public class TestMasterProcedureScheduler {
|
|||
// we should be able to fetch and execute all the sub-procs,
|
||||
// since they are operating on different regions
|
||||
for (int i = 0; i < subProcs.length; ++i) {
|
||||
TestRegionProcedure regionProc = (TestRegionProcedure)queue.poll(0);
|
||||
TestRegionProcedure regionProc = (TestRegionProcedure) queue.poll(0);
|
||||
assertEquals(subProcs[i].getProcId(), regionProc.getProcId());
|
||||
assertEquals(false, queue.waitRegions(regionProc, tableName, regionProc.getRegionInfo()));
|
||||
}
|
||||
|
@ -512,7 +520,7 @@ public class TestMasterProcedureScheduler {
|
|||
|
||||
// release all the region locks
|
||||
for (int i = 0; i < subProcs.length; ++i) {
|
||||
TestRegionProcedure regionProc = (TestRegionProcedure)subProcs[i];
|
||||
TestRegionProcedure regionProc = (TestRegionProcedure) subProcs[i];
|
||||
queue.wakeRegions(regionProc, tableName, regionProc.getRegionInfo());
|
||||
}
|
||||
|
||||
|
@ -526,27 +534,28 @@ public class TestMasterProcedureScheduler {
|
|||
@Test
|
||||
public void testInheritedRegionXLock() {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
final HRegionInfo region = new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b"));
|
||||
final RegionInfo region = RegionInfoBuilder.newBuilder(tableName)
|
||||
.setStartKey(Bytes.toBytes("a")).setEndKey(Bytes.toBytes("b")).build();
|
||||
|
||||
queue.addBack(new TestRegionProcedure(1, tableName,
|
||||
TableProcedureInterface.TableOperationType.REGION_SPLIT, region));
|
||||
TableProcedureInterface.TableOperationType.REGION_SPLIT, region));
|
||||
queue.addBack(new TestRegionProcedure(1, 2, tableName,
|
||||
TableProcedureInterface.TableOperationType.REGION_UNASSIGN, region));
|
||||
TableProcedureInterface.TableOperationType.REGION_UNASSIGN, region));
|
||||
queue.addBack(new TestRegionProcedure(3, tableName,
|
||||
TableProcedureInterface.TableOperationType.REGION_EDIT, region));
|
||||
TableProcedureInterface.TableOperationType.REGION_EDIT, region));
|
||||
|
||||
// fetch the root proc and take the lock on the region
|
||||
Procedure rootProc = queue.poll();
|
||||
Procedure<?> rootProc = queue.poll();
|
||||
assertEquals(1, rootProc.getProcId());
|
||||
assertEquals(false, queue.waitRegion(rootProc, region));
|
||||
|
||||
// fetch the sub-proc and take the lock on the region (inherited lock)
|
||||
Procedure childProc = queue.poll();
|
||||
Procedure<?> childProc = queue.poll();
|
||||
assertEquals(2, childProc.getProcId());
|
||||
assertEquals(false, queue.waitRegion(childProc, region));
|
||||
|
||||
// proc-3 will be fetched but it can't take the lock
|
||||
Procedure proc = queue.poll();
|
||||
Procedure<?> proc = queue.poll();
|
||||
assertEquals(3, proc.getProcId());
|
||||
assertEquals(true, queue.waitRegion(proc, region));
|
||||
|
||||
|
@ -570,16 +579,16 @@ public class TestMasterProcedureScheduler {
|
|||
public void testSuspendedProcedure() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
|
||||
queue.addBack(new TestTableProcedure(1, tableName,
|
||||
TableProcedureInterface.TableOperationType.READ));
|
||||
queue.addBack(new TestTableProcedure(2, tableName,
|
||||
TableProcedureInterface.TableOperationType.READ));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.READ));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.READ));
|
||||
|
||||
Procedure proc = queue.poll();
|
||||
Procedure<?> proc = queue.poll();
|
||||
assertEquals(1, proc.getProcId());
|
||||
|
||||
// suspend
|
||||
ProcedureEvent event = new ProcedureEvent("testSuspendedProcedureEvent");
|
||||
ProcedureEvent<?> event = new ProcedureEvent<>("testSuspendedProcedureEvent");
|
||||
assertEquals(true, event.suspendIfNotReady(proc));
|
||||
|
||||
proc = queue.poll();
|
||||
|
@ -594,51 +603,50 @@ public class TestMasterProcedureScheduler {
|
|||
assertEquals(null, queue.poll(0));
|
||||
}
|
||||
|
||||
private static HRegionInfo[] generateRegionInfo(final TableName tableName) {
|
||||
return new HRegionInfo[] {
|
||||
new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b")),
|
||||
new HRegionInfo(tableName, Bytes.toBytes("b"), Bytes.toBytes("c")),
|
||||
new HRegionInfo(tableName, Bytes.toBytes("c"), Bytes.toBytes("d")),
|
||||
};
|
||||
private static RegionInfo[] generateRegionInfo(final TableName tableName) {
|
||||
return new RegionInfo[] {
|
||||
RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("a"))
|
||||
.setEndKey(Bytes.toBytes("b")).build(),
|
||||
RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("b"))
|
||||
.setEndKey(Bytes.toBytes("c")).build(),
|
||||
RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("c"))
|
||||
.setEndKey(Bytes.toBytes("d")).build() };
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParentXLockAndChildrenSharedLock() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
final HRegionInfo[] regions = generateRegionInfo(tableName);
|
||||
final RegionInfo[] regions = generateRegionInfo(tableName);
|
||||
final TestRegionProcedure[] childProcs = new TestRegionProcedure[regions.length];
|
||||
for (int i = 0; i < regions.length; ++i) {
|
||||
childProcs[i] = new TestRegionProcedure(1, 2 + i, tableName,
|
||||
TableProcedureInterface.TableOperationType.REGION_ASSIGN, regions[i]);
|
||||
TableProcedureInterface.TableOperationType.REGION_ASSIGN, regions[i]);
|
||||
}
|
||||
testInheritedXLockAndChildrenSharedLock(tableName,
|
||||
new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.CREATE),
|
||||
childProcs
|
||||
);
|
||||
childProcs);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRootXLockAndChildrenSharedLock() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
final HRegionInfo[] regions = generateRegionInfo(tableName);
|
||||
final RegionInfo[] regions = generateRegionInfo(tableName);
|
||||
final TestRegionProcedure[] childProcs = new TestRegionProcedure[regions.length];
|
||||
for (int i = 0; i < regions.length; ++i) {
|
||||
childProcs[i] = new TestRegionProcedure(1, 2, 3 + i, tableName,
|
||||
TableProcedureInterface.TableOperationType.REGION_ASSIGN, regions[i]);
|
||||
TableProcedureInterface.TableOperationType.REGION_ASSIGN, regions[i]);
|
||||
}
|
||||
testInheritedXLockAndChildrenSharedLock(tableName,
|
||||
new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.CREATE),
|
||||
childProcs
|
||||
);
|
||||
childProcs);
|
||||
}
|
||||
|
||||
private void testInheritedXLockAndChildrenSharedLock(final TableName tableName,
|
||||
final TestTableProcedure rootProc, final TestRegionProcedure[] childProcs)
|
||||
throws Exception {
|
||||
final TestTableProcedure rootProc, final TestRegionProcedure[] childProcs) throws Exception {
|
||||
queue.addBack(rootProc);
|
||||
|
||||
// fetch and acquire first xlock proc
|
||||
Procedure parentProc = queue.poll();
|
||||
Procedure<?> parentProc = queue.poll();
|
||||
assertEquals(rootProc, parentProc);
|
||||
assertEquals(false, queue.waitTableExclusiveLock(parentProc, tableName));
|
||||
|
||||
|
@ -648,12 +656,12 @@ public class TestMasterProcedureScheduler {
|
|||
}
|
||||
|
||||
// add another xlock procedure (no parent)
|
||||
queue.addBack(new TestTableProcedure(100, tableName,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(100, tableName, TableProcedureInterface.TableOperationType.EDIT));
|
||||
|
||||
// fetch and execute child
|
||||
for (int i = 0; i < childProcs.length; ++i) {
|
||||
TestRegionProcedure childProc = (TestRegionProcedure)queue.poll();
|
||||
TestRegionProcedure childProc = (TestRegionProcedure) queue.poll();
|
||||
LOG.debug("fetch children " + childProc);
|
||||
assertEquals(false, queue.waitRegions(childProc, tableName, childProc.getRegionInfo()));
|
||||
queue.wakeRegions(childProc, tableName, childProc.getRegionInfo());
|
||||
|
@ -666,7 +674,7 @@ public class TestMasterProcedureScheduler {
|
|||
queue.wakeTableExclusiveLock(parentProc, tableName);
|
||||
|
||||
// fetch the other xlock proc
|
||||
Procedure proc = queue.poll();
|
||||
Procedure<?> proc = queue.poll();
|
||||
assertEquals(100, proc.getProcId());
|
||||
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
|
||||
queue.wakeTableExclusiveLock(proc, tableName);
|
||||
|
@ -677,8 +685,7 @@ public class TestMasterProcedureScheduler {
|
|||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
testInheritedXLockAndChildrenXLock(tableName,
|
||||
new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.EDIT),
|
||||
new TestTableProcedure(1, 2, tableName, TableProcedureInterface.TableOperationType.EDIT)
|
||||
);
|
||||
new TestTableProcedure(1, 2, tableName, TableProcedureInterface.TableOperationType.EDIT));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -687,8 +694,7 @@ public class TestMasterProcedureScheduler {
|
|||
// simulate 3 procedures: 1 (root), (2) child of root, (3) child of proc-2
|
||||
testInheritedXLockAndChildrenXLock(tableName,
|
||||
new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.EDIT),
|
||||
new TestTableProcedure(1, 2, 3, tableName, TableProcedureInterface.TableOperationType.EDIT)
|
||||
);
|
||||
new TestTableProcedure(1, 2, 3, tableName, TableProcedureInterface.TableOperationType.EDIT));
|
||||
}
|
||||
|
||||
private void testInheritedXLockAndChildrenXLock(final TableName tableName,
|
||||
|
@ -696,7 +702,7 @@ public class TestMasterProcedureScheduler {
|
|||
queue.addBack(rootProc);
|
||||
|
||||
// fetch and acquire first xlock proc
|
||||
Procedure parentProc = queue.poll();
|
||||
Procedure<?> parentProc = queue.poll();
|
||||
assertEquals(rootProc, parentProc);
|
||||
assertEquals(false, queue.waitTableExclusiveLock(parentProc, tableName));
|
||||
|
||||
|
@ -704,7 +710,7 @@ public class TestMasterProcedureScheduler {
|
|||
queue.addFront(childProc);
|
||||
|
||||
// fetch the other xlock proc
|
||||
Procedure proc = queue.poll();
|
||||
Procedure<?> proc = queue.poll();
|
||||
assertEquals(childProc, proc);
|
||||
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
|
||||
queue.wakeTableExclusiveLock(proc, tableName);
|
||||
|
@ -717,13 +723,13 @@ public class TestMasterProcedureScheduler {
|
|||
public void testYieldWithXLockHeld() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
|
||||
queue.addBack(new TestTableProcedure(1, tableName,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(new TestTableProcedure(2, tableName,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.EDIT));
|
||||
|
||||
// fetch from the queue and acquire xlock for the first proc
|
||||
Procedure proc = queue.poll();
|
||||
Procedure<?> proc = queue.poll();
|
||||
assertEquals(1, proc.getProcId());
|
||||
assertEquals(false, queue.waitTableExclusiveLock(proc, tableName));
|
||||
|
||||
|
@ -748,20 +754,20 @@ public class TestMasterProcedureScheduler {
|
|||
public void testYieldWithSharedLockHeld() throws Exception {
|
||||
final TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
|
||||
queue.addBack(new TestTableProcedure(1, tableName,
|
||||
TableProcedureInterface.TableOperationType.READ));
|
||||
queue.addBack(new TestTableProcedure(2, tableName,
|
||||
TableProcedureInterface.TableOperationType.READ));
|
||||
queue.addBack(new TestTableProcedure(3, tableName,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(1, tableName, TableProcedureInterface.TableOperationType.READ));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(2, tableName, TableProcedureInterface.TableOperationType.READ));
|
||||
queue.addBack(
|
||||
new TestTableProcedure(3, tableName, TableProcedureInterface.TableOperationType.EDIT));
|
||||
|
||||
// fetch and acquire the first shared-lock
|
||||
Procedure proc1 = queue.poll();
|
||||
Procedure<?> proc1 = queue.poll();
|
||||
assertEquals(1, proc1.getProcId());
|
||||
assertEquals(false, queue.waitTableSharedLock(proc1, tableName));
|
||||
|
||||
// fetch and acquire the second shared-lock
|
||||
Procedure proc2 = queue.poll();
|
||||
Procedure<?> proc2 = queue.poll();
|
||||
assertEquals(2, proc2.getProcId());
|
||||
assertEquals(false, queue.waitTableSharedLock(proc2, tableName));
|
||||
|
||||
|
@ -769,8 +775,8 @@ public class TestMasterProcedureScheduler {
|
|||
assertEquals(null, queue.poll(0));
|
||||
|
||||
// put the procs back in the queue
|
||||
queue.yield(proc2);
|
||||
queue.yield(proc1);
|
||||
queue.yield(proc2);
|
||||
|
||||
// fetch from the queue, it should fetch the ones with just added back
|
||||
proc1 = queue.poll();
|
||||
|
@ -782,12 +788,11 @@ public class TestMasterProcedureScheduler {
|
|||
queue.wakeTableSharedLock(proc1, tableName);
|
||||
queue.wakeTableSharedLock(proc2, tableName);
|
||||
|
||||
Procedure proc3 = queue.poll();
|
||||
Procedure<?> proc3 = queue.poll();
|
||||
assertEquals(3, proc3.getProcId());
|
||||
}
|
||||
|
||||
public static class TestTableProcedure extends TestProcedure
|
||||
implements TableProcedureInterface {
|
||||
public static class TestTableProcedure extends TestProcedure implements TableProcedureInterface {
|
||||
private final TableOperationType opType;
|
||||
private final TableName tableName;
|
||||
|
||||
|
@ -810,6 +815,7 @@ public class TestMasterProcedureScheduler {
|
|||
this.tableName = tableName;
|
||||
this.opType = opType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableName getTableName() {
|
||||
return tableName;
|
||||
|
@ -830,14 +836,15 @@ public class TestMasterProcedureScheduler {
|
|||
}
|
||||
|
||||
public static class TestTableProcedureWithEvent extends TestTableProcedure {
|
||||
private final ProcedureEvent event;
|
||||
private final ProcedureEvent<?> event;
|
||||
|
||||
public TestTableProcedureWithEvent(long procId, TableName tableName, TableOperationType opType) {
|
||||
public TestTableProcedureWithEvent(long procId, TableName tableName,
|
||||
TableOperationType opType) {
|
||||
super(procId, tableName, opType);
|
||||
event = new ProcedureEvent(tableName + " procId=" + procId);
|
||||
event = new ProcedureEvent<>(tableName + " procId=" + procId);
|
||||
}
|
||||
|
||||
public ProcedureEvent getEvent() {
|
||||
public ProcedureEvent<?> getEvent() {
|
||||
return event;
|
||||
}
|
||||
}
|
||||
|
@ -933,7 +940,8 @@ public class TestMasterProcedureScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
private static LockProcedure createLockProcedure(LockType lockType, long procId) throws Exception {
|
||||
private static LockProcedure createLockProcedure(LockType lockType, long procId)
|
||||
throws Exception {
|
||||
LockProcedure procedure = new LockProcedure();
|
||||
|
||||
Field typeField = LockProcedure.class.getDeclaredField("type");
|
||||
|
@ -1001,7 +1009,7 @@ public class TestMasterProcedureScheduler {
|
|||
|
||||
LockedResource tableResource = locks.get(1);
|
||||
assertLockResource(tableResource, LockedResourceType.TABLE,
|
||||
TableName.NAMESPACE_TABLE_NAME.getNameAsString());
|
||||
TableName.NAMESPACE_TABLE_NAME.getNameAsString());
|
||||
assertSharedLock(tableResource, 1);
|
||||
assertTrue(tableResource.getWaitingProcedures().isEmpty());
|
||||
}
|
||||
|
@ -1028,7 +1036,8 @@ public class TestMasterProcedureScheduler {
|
|||
@Test
|
||||
public void testListLocksRegion() throws Exception {
|
||||
LockProcedure procedure = createExclusiveLockProcedure(3);
|
||||
HRegionInfo regionInfo = new HRegionInfo(TableName.valueOf("ns3", "table3"));
|
||||
RegionInfo regionInfo =
|
||||
RegionInfoBuilder.newBuilder(TableName.valueOf("ns3", "table3")).build();
|
||||
|
||||
queue.waitRegion(procedure, regionInfo);
|
||||
|
||||
|
@ -1144,4 +1153,3 @@ public class TestMasterProcedureScheduler {
|
|||
queue.wakeTableExclusiveLock(parentProc, tableName);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -54,7 +54,7 @@ public class TestMasterProcedureSchedulerConcurrency {
|
|||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
queue = new MasterProcedureScheduler();
|
||||
queue = new MasterProcedureScheduler(pid -> null);
|
||||
queue.start();
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,276 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master.procedure;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
|
||||
import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
|
||||
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
|
||||
@Category({ MasterTests.class, LargeTests.class })
|
||||
public class TestSchedulerQueueDeadLock {
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestSchedulerQueueDeadLock.class);
|
||||
|
||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||
|
||||
private static final TableName TABLE_NAME = TableName.valueOf("deadlock");
|
||||
|
||||
private static final class TestEnv {
|
||||
private final MasterProcedureScheduler scheduler;
|
||||
|
||||
public TestEnv(MasterProcedureScheduler scheduler) {
|
||||
this.scheduler = scheduler;
|
||||
}
|
||||
|
||||
public MasterProcedureScheduler getScheduler() {
|
||||
return scheduler;
|
||||
}
|
||||
}
|
||||
|
||||
public static class TableSharedProcedure extends NoopProcedure<TestEnv>
|
||||
implements TableProcedureInterface {
|
||||
|
||||
private final Semaphore latch = new Semaphore(0);
|
||||
|
||||
@Override
|
||||
protected Procedure<TestEnv>[] execute(TestEnv env)
|
||||
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||
latch.acquire();
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected LockState acquireLock(TestEnv env) {
|
||||
if (env.getScheduler().waitTableSharedLock(this, getTableName())) {
|
||||
return LockState.LOCK_EVENT_WAIT;
|
||||
}
|
||||
return LockState.LOCK_ACQUIRED;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void releaseLock(TestEnv env) {
|
||||
env.getScheduler().wakeTableSharedLock(this, getTableName());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean holdLock(TestEnv env) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableName getTableName() {
|
||||
return TABLE_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableOperationType getTableOperationType() {
|
||||
return TableOperationType.READ;
|
||||
}
|
||||
}
|
||||
|
||||
public static class TableExclusiveProcedure extends NoopProcedure<TestEnv>
|
||||
implements TableProcedureInterface {
|
||||
|
||||
private final Semaphore latch = new Semaphore(0);
|
||||
|
||||
@Override
|
||||
protected Procedure<TestEnv>[] execute(TestEnv env)
|
||||
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||
latch.acquire();
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected LockState acquireLock(TestEnv env) {
|
||||
if (env.getScheduler().waitTableExclusiveLock(this, getTableName())) {
|
||||
return LockState.LOCK_EVENT_WAIT;
|
||||
}
|
||||
return LockState.LOCK_ACQUIRED;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void releaseLock(TestEnv env) {
|
||||
env.getScheduler().wakeTableExclusiveLock(this, getTableName());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean holdLock(TestEnv env) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableName getTableName() {
|
||||
return TABLE_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableOperationType getTableOperationType() {
|
||||
return TableOperationType.EDIT;
|
||||
}
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws IOException {
|
||||
UTIL.cleanupTestDir();
|
||||
}
|
||||
|
||||
private WALProcedureStore procStore;
|
||||
|
||||
private ProcedureExecutor<TestEnv> procExec;
|
||||
|
||||
@Rule
|
||||
public final TestName name = new TestName();
|
||||
|
||||
@Before
|
||||
public void setUp() throws IOException {
|
||||
UTIL.getConfiguration().setInt("hbase.procedure.worker.stuck.threshold.msec", 6000000);
|
||||
procStore = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(),
|
||||
UTIL.getDataTestDir(name.getMethodName()));
|
||||
procStore.start(1);
|
||||
MasterProcedureScheduler scheduler = new MasterProcedureScheduler(pid -> null);
|
||||
procExec = new ProcedureExecutor<>(UTIL.getConfiguration(), new TestEnv(scheduler), procStore,
|
||||
scheduler);
|
||||
procExec.init(1, false);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
procExec.stop();
|
||||
procStore.stop(false);
|
||||
}
|
||||
|
||||
public static final class TableSharedProcedureWithId extends TableSharedProcedure {
|
||||
|
||||
@Override
|
||||
protected void setProcId(long procId) {
|
||||
// this is a hack to make this procedure be loaded after the procedure below as we will sort
|
||||
// the procedures by id when loading.
|
||||
super.setProcId(2L);
|
||||
}
|
||||
}
|
||||
|
||||
public static final class TableExclusiveProcedureWithId extends TableExclusiveProcedure {
|
||||
|
||||
@Override
|
||||
protected void setProcId(long procId) {
|
||||
// this is a hack to make this procedure be loaded before the procedure above as we will
|
||||
// sort the procedures by id when loading.
|
||||
super.setProcId(1L);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTableProcedureDeadLockAfterRestarting() throws Exception {
|
||||
// let the shared procedure run first, but let it have a greater procId so when loading it will
|
||||
// be loaded at last.
|
||||
long procId1 = procExec.submitProcedure(new TableSharedProcedureWithId());
|
||||
long procId2 = procExec.submitProcedure(new TableExclusiveProcedureWithId());
|
||||
procExec.startWorkers();
|
||||
UTIL.waitFor(10000,
|
||||
() -> ((TableSharedProcedure) procExec.getProcedure(procId1)).latch.hasQueuedThreads());
|
||||
|
||||
ProcedureTestingUtility.restart(procExec);
|
||||
|
||||
((TableSharedProcedure) procExec.getProcedure(procId1)).latch.release();
|
||||
((TableExclusiveProcedure) procExec.getProcedure(procId2)).latch.release();
|
||||
|
||||
UTIL.waitFor(10000, () -> procExec.isFinished(procId1));
|
||||
UTIL.waitFor(10000, () -> procExec.isFinished(procId2));
|
||||
}
|
||||
|
||||
public static final class TableShardParentProcedure extends NoopProcedure<TestEnv>
|
||||
implements TableProcedureInterface {
|
||||
|
||||
private boolean scheduled;
|
||||
|
||||
@Override
|
||||
protected Procedure<TestEnv>[] execute(TestEnv env)
|
||||
throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
|
||||
if (!scheduled) {
|
||||
scheduled = true;
|
||||
return new Procedure[] { new TableSharedProcedure() };
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected LockState acquireLock(TestEnv env) {
|
||||
if (env.getScheduler().waitTableSharedLock(this, getTableName())) {
|
||||
return LockState.LOCK_EVENT_WAIT;
|
||||
}
|
||||
return LockState.LOCK_ACQUIRED;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void releaseLock(TestEnv env) {
|
||||
env.getScheduler().wakeTableSharedLock(this, getTableName());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean holdLock(TestEnv env) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableName getTableName() {
|
||||
return TABLE_NAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TableOperationType getTableOperationType() {
|
||||
return TableOperationType.READ;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTableProcedureSubProcedureDeadLock() throws Exception {
|
||||
// the shared procedure will also schedule a shared procedure, but after the exclusive procedure
|
||||
long procId1 = procExec.submitProcedure(new TableShardParentProcedure());
|
||||
long procId2 = procExec.submitProcedure(new TableExclusiveProcedure());
|
||||
procExec.startWorkers();
|
||||
UTIL.waitFor(10000,
|
||||
() -> procExec.getProcedures().stream().anyMatch(p -> p instanceof TableSharedProcedure));
|
||||
procExec.getProcedures().stream().filter(p -> p instanceof TableSharedProcedure)
|
||||
.map(p -> (TableSharedProcedure) p).forEach(p -> p.latch.release());
|
||||
((TableExclusiveProcedure) procExec.getProcedure(procId2)).latch.release();
|
||||
|
||||
UTIL.waitFor(10000, () -> procExec.isFinished(procId1));
|
||||
UTIL.waitFor(10000, () -> procExec.isFinished(procId2));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue