HBASE-20847 The parent procedure of RegionTransitionProcedure may not have the table lock
This commit is contained in:
parent
176afd998a
commit
430b056ade
|
@ -253,22 +253,16 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
|
||||||
* Wakes up given waiting procedures by pushing them back into scheduler queues.
|
* Wakes up given waiting procedures by pushing them back into scheduler queues.
|
||||||
* @return size of given {@code waitQueue}.
|
* @return size of given {@code waitQueue}.
|
||||||
*/
|
*/
|
||||||
protected int wakeWaitingProcedures(final ProcedureDeque waitQueue) {
|
protected int wakeWaitingProcedures(LockAndQueue lockAndQueue) {
|
||||||
int count = waitQueue.size();
|
return lockAndQueue.wakeWaitingProcedures(this);
|
||||||
// wakeProcedure adds to the front of queue, so we start from last in the
|
|
||||||
// waitQueue' queue, so that the procedure which was added first goes in the front for
|
|
||||||
// the scheduler queue.
|
|
||||||
addFront(waitQueue.descendingIterator());
|
|
||||||
waitQueue.clear();
|
|
||||||
return count;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void waitProcedure(final ProcedureDeque waitQueue, final Procedure proc) {
|
protected void waitProcedure(LockAndQueue lockAndQueue, final Procedure proc) {
|
||||||
waitQueue.addLast(proc);
|
lockAndQueue.addLast(proc);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void wakeProcedure(final Procedure procedure) {
|
protected void wakeProcedure(final Procedure procedure) {
|
||||||
if (LOG.isTraceEnabled()) LOG.trace("Wake " + procedure);
|
LOG.trace("Wake {}", procedure);
|
||||||
push(procedure, /* addFront= */ true, /* notify= */false);
|
push(procedure, /* addFront= */ true, /* notify= */false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -285,7 +279,9 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void wakePollIfNeeded(final int waitingCount) {
|
protected void wakePollIfNeeded(final int waitingCount) {
|
||||||
if (waitingCount <= 0) return;
|
if (waitingCount <= 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
if (waitingCount == 1) {
|
if (waitingCount == 1) {
|
||||||
schedWaitCond.signal();
|
schedWaitCond.signal();
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.procedure2;
|
package org.apache.hadoop.hbase.procedure2;
|
||||||
|
|
||||||
|
import java.util.function.Predicate;
|
||||||
|
import java.util.stream.Stream;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -45,7 +47,8 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
* We do not use ReentrantReadWriteLock directly because of its high memory overhead.
|
* We do not use ReentrantReadWriteLock directly because of its high memory overhead.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class LockAndQueue extends ProcedureDeque implements LockStatus {
|
public class LockAndQueue implements LockStatus {
|
||||||
|
private final ProcedureDeque queue = new ProcedureDeque();
|
||||||
private Procedure<?> exclusiveLockOwnerProcedure = null;
|
private Procedure<?> exclusiveLockOwnerProcedure = null;
|
||||||
private int sharedLock = 0;
|
private int sharedLock = 0;
|
||||||
|
|
||||||
|
@ -69,12 +72,14 @@ public class LockAndQueue extends ProcedureDeque implements LockStatus {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean hasParentLock(final Procedure proc) {
|
public boolean hasParentLock(Procedure<?> proc) {
|
||||||
return proc.hasParent() && (isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId()));
|
// TODO: need to check all the ancestors
|
||||||
|
return proc.hasParent() &&
|
||||||
|
(isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean hasLockAccess(final Procedure proc) {
|
public boolean hasLockAccess(Procedure<?> proc) {
|
||||||
return isLockOwner(proc.getProcId()) || hasParentLock(proc);
|
return isLockOwner(proc.getProcId()) || hasParentLock(proc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,37 +106,87 @@ public class LockAndQueue extends ProcedureDeque implements LockStatus {
|
||||||
// try/release Shared/Exclusive lock
|
// try/release Shared/Exclusive lock
|
||||||
// ======================================================================
|
// ======================================================================
|
||||||
|
|
||||||
public boolean trySharedLock() {
|
/**
|
||||||
if (hasExclusiveLock()) return false;
|
* @return whether we have succesfully acquired the shared lock.
|
||||||
|
*/
|
||||||
|
public boolean trySharedLock(Procedure<?> proc) {
|
||||||
|
if (hasExclusiveLock() && !hasLockAccess(proc)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// If no one holds the xlock, then we are free to hold the sharedLock
|
||||||
|
// If the parent proc or we have already held the xlock, then we return true here as
|
||||||
|
// xlock is more powerful then shared lock.
|
||||||
sharedLock++;
|
sharedLock++;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return whether we should wake the procedures waiting on the lock here.
|
||||||
|
*/
|
||||||
public boolean releaseSharedLock() {
|
public boolean releaseSharedLock() {
|
||||||
return --sharedLock == 0;
|
// hasExclusiveLock could be true, it usually means we acquire shared lock while we or our
|
||||||
|
// parent have held the xlock. And since there is still an exclusive lock, we do not need to
|
||||||
|
// wake any procedures.
|
||||||
|
return --sharedLock == 0 && !hasExclusiveLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean tryExclusiveLock(final Procedure proc) {
|
public boolean tryExclusiveLock(Procedure<?> proc) {
|
||||||
if (isLocked()) return hasLockAccess(proc);
|
if (isLocked()) {
|
||||||
|
return hasLockAccess(proc);
|
||||||
|
}
|
||||||
exclusiveLockOwnerProcedure = proc;
|
exclusiveLockOwnerProcedure = proc;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return True if we released a lock.
|
* @return whether we should wake the procedures waiting on the lock here.
|
||||||
*/
|
*/
|
||||||
public boolean releaseExclusiveLock(final Procedure proc) {
|
public boolean releaseExclusiveLock(Procedure<?> proc) {
|
||||||
if (isLockOwner(proc.getProcId())) {
|
if (!isLockOwner(proc.getProcId())) {
|
||||||
exclusiveLockOwnerProcedure = null;
|
// We are not the lock owner, it is probably inherited from the parent procedures.
|
||||||
return true;
|
return false;
|
||||||
}
|
}
|
||||||
return false;
|
exclusiveLockOwnerProcedure = null;
|
||||||
|
// This maybe a bit strange so let me explain. We allow acquiring shared lock while the parent
|
||||||
|
// proc or we have already held the xlock, and also allow releasing the locks in any order, so
|
||||||
|
// it could happen that the xlock is released but there are still some procs holding the shared
|
||||||
|
// lock.
|
||||||
|
// In HBase, this could happen when a proc which holdLock is false and schedules sub procs which
|
||||||
|
// acquire the shared lock on the same lock. This is because we will schedule the sub proces
|
||||||
|
// before releasing the lock, so the sub procs could call acquire lock before we releasing the
|
||||||
|
// xlock.
|
||||||
|
return sharedLock == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isWaitingQueueEmpty() {
|
||||||
|
return queue.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Procedure<?> removeFirst() {
|
||||||
|
return queue.removeFirst();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addLast(Procedure<?> proc) {
|
||||||
|
queue.addLast(proc);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int wakeWaitingProcedures(ProcedureScheduler scheduler) {
|
||||||
|
int count = queue.size();
|
||||||
|
// wakeProcedure adds to the front of queue, so we start from last in the waitQueue' queue, so
|
||||||
|
// that the procedure which was added first goes in the front for the scheduler queue.
|
||||||
|
scheduler.addFront(queue.descendingIterator());
|
||||||
|
queue.clear();
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
public Stream<Procedure> filterWaitingQueue(Predicate<Procedure> predicate) {
|
||||||
|
return queue.stream().filter(predicate);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "exclusiveLockOwner=" + (hasExclusiveLock()? getExclusiveLockProcIdOwner(): "NONE") +
|
return "exclusiveLockOwner=" + (hasExclusiveLock() ? getExclusiveLockProcIdOwner() : "NONE") +
|
||||||
", sharedLockCount=" + getSharedLockCount() +
|
", sharedLockCount=" + getSharedLockCount() + ", waitingProcCount=" + queue.size();
|
||||||
", waitingProcCount=" + size();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,9 +33,9 @@ public interface LockStatus {
|
||||||
|
|
||||||
boolean isLockOwner(long procId);
|
boolean isLockOwner(long procId);
|
||||||
|
|
||||||
boolean hasParentLock(final Procedure proc);
|
boolean hasParentLock(Procedure<?> proc);
|
||||||
|
|
||||||
boolean hasLockAccess(final Procedure proc);
|
boolean hasLockAccess(Procedure<?> proc);
|
||||||
|
|
||||||
Procedure<?> getExclusiveLockOwnerProcedure();
|
Procedure<?> getExclusiveLockOwnerProcedure();
|
||||||
|
|
||||||
|
|
|
@ -531,7 +531,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
final String namespace = table.getNamespaceAsString();
|
final String namespace = table.getNamespaceAsString();
|
||||||
final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
|
final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
|
||||||
final LockAndQueue tableLock = locking.getTableLock(table);
|
final LockAndQueue tableLock = locking.getTableLock(table);
|
||||||
if (!namespaceLock.trySharedLock()) {
|
if (!namespaceLock.trySharedLock(procedure)) {
|
||||||
waitProcedure(namespaceLock, procedure);
|
waitProcedure(namespaceLock, procedure);
|
||||||
logLockedResource(LockedResourceType.NAMESPACE, namespace);
|
logLockedResource(LockedResourceType.NAMESPACE, namespace);
|
||||||
return true;
|
return true;
|
||||||
|
@ -560,9 +560,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
|
final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
|
||||||
final LockAndQueue tableLock = locking.getTableLock(table);
|
final LockAndQueue tableLock = locking.getTableLock(table);
|
||||||
int waitingCount = 0;
|
int waitingCount = 0;
|
||||||
|
if (tableLock.releaseExclusiveLock(procedure)) {
|
||||||
if (!tableLock.hasParentLock(procedure)) {
|
|
||||||
tableLock.releaseExclusiveLock(procedure);
|
|
||||||
waitingCount += wakeWaitingProcedures(tableLock);
|
waitingCount += wakeWaitingProcedures(tableLock);
|
||||||
}
|
}
|
||||||
if (namespaceLock.releaseSharedLock()) {
|
if (namespaceLock.releaseSharedLock()) {
|
||||||
|
@ -591,12 +589,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
try {
|
try {
|
||||||
final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
|
final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
|
||||||
final LockAndQueue tableLock = locking.getTableLock(table);
|
final LockAndQueue tableLock = locking.getTableLock(table);
|
||||||
if (!namespaceLock.trySharedLock()) {
|
if (!namespaceLock.trySharedLock(procedure)) {
|
||||||
waitProcedure(namespaceLock, procedure);
|
waitProcedure(namespaceLock, procedure);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!tableLock.trySharedLock()) {
|
if (!tableLock.trySharedLock(procedure)) {
|
||||||
namespaceLock.releaseSharedLock();
|
namespaceLock.releaseSharedLock();
|
||||||
waitProcedure(tableLock, procedure);
|
waitProcedure(tableLock, procedure);
|
||||||
return null;
|
return null;
|
||||||
|
@ -690,11 +688,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
Arrays.sort(regionInfo, RegionInfo.COMPARATOR);
|
Arrays.sort(regionInfo, RegionInfo.COMPARATOR);
|
||||||
schedLock();
|
schedLock();
|
||||||
try {
|
try {
|
||||||
// If there is parent procedure, it would have already taken xlock, so no need to take
|
assert table != null;
|
||||||
// shared lock here. Otherwise, take shared lock.
|
if (waitTableSharedLock(procedure, table)) {
|
||||||
if (!procedure.hasParent()
|
return true;
|
||||||
&& waitTableQueueSharedLock(procedure, table) == null) {
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// acquire region xlocks or wait
|
// acquire region xlocks or wait
|
||||||
|
@ -702,7 +698,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
final LockAndQueue[] regionLocks = new LockAndQueue[regionInfo.length];
|
final LockAndQueue[] regionLocks = new LockAndQueue[regionInfo.length];
|
||||||
for (int i = 0; i < regionInfo.length; ++i) {
|
for (int i = 0; i < regionInfo.length; ++i) {
|
||||||
LOG.info("{} checking lock on {}", procedure, regionInfo[i].getEncodedName());
|
LOG.info("{} checking lock on {}", procedure, regionInfo[i].getEncodedName());
|
||||||
assert table != null;
|
|
||||||
assert regionInfo[i] != null;
|
assert regionInfo[i] != null;
|
||||||
assert regionInfo[i].getTable() != null;
|
assert regionInfo[i].getTable() != null;
|
||||||
assert regionInfo[i].getTable().equals(table): regionInfo[i] + " " + procedure;
|
assert regionInfo[i].getTable().equals(table): regionInfo[i] + " " + procedure;
|
||||||
|
@ -719,7 +714,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!hasLock && !procedure.hasParent()) {
|
if (!hasLock) {
|
||||||
wakeTableSharedLock(procedure, table);
|
wakeTableSharedLock(procedure, table);
|
||||||
}
|
}
|
||||||
return !hasLock;
|
return !hasLock;
|
||||||
|
@ -748,14 +743,14 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
schedLock();
|
schedLock();
|
||||||
try {
|
try {
|
||||||
int numProcs = 0;
|
int numProcs = 0;
|
||||||
final Procedure[] nextProcs = new Procedure[regionInfo.length];
|
final Procedure<?>[] nextProcs = new Procedure[regionInfo.length];
|
||||||
for (int i = 0; i < regionInfo.length; ++i) {
|
for (int i = 0; i < regionInfo.length; ++i) {
|
||||||
assert regionInfo[i].getTable().equals(table);
|
assert regionInfo[i].getTable().equals(table);
|
||||||
assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i];
|
assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i];
|
||||||
|
|
||||||
LockAndQueue regionLock = locking.getRegionLock(regionInfo[i].getEncodedName());
|
LockAndQueue regionLock = locking.getRegionLock(regionInfo[i].getEncodedName());
|
||||||
if (regionLock.releaseExclusiveLock(procedure)) {
|
if (regionLock.releaseExclusiveLock(procedure)) {
|
||||||
if (!regionLock.isEmpty()) {
|
if (!regionLock.isWaitingQueueEmpty()) {
|
||||||
// release one procedure at the time since regions has an xlock
|
// release one procedure at the time since regions has an xlock
|
||||||
nextProcs[numProcs++] = regionLock.removeFirst();
|
nextProcs[numProcs++] = regionLock.removeFirst();
|
||||||
} else {
|
} else {
|
||||||
|
@ -769,11 +764,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
wakeProcedure(nextProcs[i]);
|
wakeProcedure(nextProcs[i]);
|
||||||
}
|
}
|
||||||
wakePollIfNeeded(numProcs);
|
wakePollIfNeeded(numProcs);
|
||||||
if (!procedure.hasParent()) {
|
// release the table shared-lock.
|
||||||
// release the table shared-lock.
|
wakeTableSharedLock(procedure, table);
|
||||||
// (if we have a parent, it is holding an xlock so we didn't take the shared-lock)
|
|
||||||
wakeTableSharedLock(procedure, table);
|
|
||||||
}
|
|
||||||
} finally {
|
} finally {
|
||||||
schedUnlock();
|
schedUnlock();
|
||||||
}
|
}
|
||||||
|
@ -789,15 +781,15 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
* @param namespace Namespace to lock
|
* @param namespace Namespace to lock
|
||||||
* @return true if the procedure has to wait for the namespace to be available
|
* @return true if the procedure has to wait for the namespace to be available
|
||||||
*/
|
*/
|
||||||
public boolean waitNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) {
|
public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) {
|
||||||
schedLock();
|
schedLock();
|
||||||
try {
|
try {
|
||||||
final LockAndQueue systemNamespaceTableLock =
|
final LockAndQueue systemNamespaceTableLock =
|
||||||
locking.getTableLock(TableName.NAMESPACE_TABLE_NAME);
|
locking.getTableLock(TableName.NAMESPACE_TABLE_NAME);
|
||||||
if (!systemNamespaceTableLock.trySharedLock()) {
|
if (!systemNamespaceTableLock.trySharedLock(procedure)) {
|
||||||
waitProcedure(systemNamespaceTableLock, procedure);
|
waitProcedure(systemNamespaceTableLock, procedure);
|
||||||
logLockedResource(LockedResourceType.TABLE,
|
logLockedResource(LockedResourceType.TABLE,
|
||||||
TableName.NAMESPACE_TABLE_NAME.getNameAsString());
|
TableName.NAMESPACE_TABLE_NAME.getNameAsString());
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -826,13 +818,14 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
|
final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
|
||||||
final LockAndQueue systemNamespaceTableLock =
|
final LockAndQueue systemNamespaceTableLock =
|
||||||
locking.getTableLock(TableName.NAMESPACE_TABLE_NAME);
|
locking.getTableLock(TableName.NAMESPACE_TABLE_NAME);
|
||||||
namespaceLock.releaseExclusiveLock(procedure);
|
|
||||||
int waitingCount = 0;
|
int waitingCount = 0;
|
||||||
|
if (namespaceLock.releaseExclusiveLock(procedure)) {
|
||||||
|
waitingCount += wakeWaitingProcedures(namespaceLock);
|
||||||
|
}
|
||||||
if (systemNamespaceTableLock.releaseSharedLock()) {
|
if (systemNamespaceTableLock.releaseSharedLock()) {
|
||||||
addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME));
|
addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME));
|
||||||
waitingCount += wakeWaitingProcedures(systemNamespaceTableLock);
|
waitingCount += wakeWaitingProcedures(systemNamespaceTableLock);
|
||||||
}
|
}
|
||||||
waitingCount += wakeWaitingProcedures(namespaceLock);
|
|
||||||
wakePollIfNeeded(waitingCount);
|
wakePollIfNeeded(waitingCount);
|
||||||
} finally {
|
} finally {
|
||||||
schedUnlock();
|
schedUnlock();
|
||||||
|
@ -878,6 +871,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
schedLock();
|
schedLock();
|
||||||
try {
|
try {
|
||||||
final LockAndQueue lock = locking.getServerLock(serverName);
|
final LockAndQueue lock = locking.getServerLock(serverName);
|
||||||
|
// Only SCP will acquire/release server lock so do not need to check the return value here.
|
||||||
lock.releaseExclusiveLock(procedure);
|
lock.releaseExclusiveLock(procedure);
|
||||||
// We do not need to create a new queue so just pass null, as in tests we may pass procedures
|
// We do not need to create a new queue so just pass null, as in tests we may pass procedures
|
||||||
// other than ServerProcedureInterface
|
// other than ServerProcedureInterface
|
||||||
|
@ -929,10 +923,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
schedLock();
|
schedLock();
|
||||||
try {
|
try {
|
||||||
final LockAndQueue lock = locking.getPeerLock(peerId);
|
final LockAndQueue lock = locking.getPeerLock(peerId);
|
||||||
lock.releaseExclusiveLock(procedure);
|
if (lock.releaseExclusiveLock(procedure)) {
|
||||||
addToRunQueue(peerRunQueue, getPeerQueue(peerId));
|
addToRunQueue(peerRunQueue, getPeerQueue(peerId));
|
||||||
int waitingCount = wakeWaitingProcedures(lock);
|
int waitingCount = wakeWaitingProcedures(lock);
|
||||||
wakePollIfNeeded(waitingCount);
|
wakePollIfNeeded(waitingCount);
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
schedUnlock();
|
schedUnlock();
|
||||||
}
|
}
|
||||||
|
|
|
@ -125,13 +125,8 @@ class SchemaLocking {
|
||||||
|
|
||||||
List<Procedure<?>> waitingProcedures = new ArrayList<>();
|
List<Procedure<?>> waitingProcedures = new ArrayList<>();
|
||||||
|
|
||||||
for (Procedure<?> procedure : queue) {
|
queue.filterWaitingQueue(p -> p instanceof LockProcedure)
|
||||||
if (!(procedure instanceof LockProcedure)) {
|
.forEachOrdered(waitingProcedures::add);
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
waitingProcedures.add(procedure);
|
|
||||||
}
|
|
||||||
|
|
||||||
return new LockedResource(resourceType, resourceName, lockType, exclusiveLockOwnerProcedure,
|
return new LockedResource(resourceType, resourceName, lockType, exclusiveLockOwnerProcedure,
|
||||||
sharedLockCount, waitingProcedures);
|
sharedLockCount, waitingProcedures);
|
||||||
|
|
|
@ -95,9 +95,9 @@ public class TestAdmin2 {
|
||||||
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
|
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
|
||||||
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
|
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
|
||||||
TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
|
TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
|
||||||
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.metahandler.count", 30);
|
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 30);
|
||||||
TEST_UTIL.getConfiguration().setBoolean(
|
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 30);
|
||||||
"hbase.master.enabletable.roundrobin", true);
|
TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
|
||||||
TEST_UTIL.startMiniCluster(3);
|
TEST_UTIL.startMiniCluster(3);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.procedure;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertSame;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -31,7 +32,10 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
import org.apache.hadoop.hbase.master.locking.LockProcedure;
|
import org.apache.hadoop.hbase.master.locking.LockProcedure;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
|
||||||
import org.apache.hadoop.hbase.procedure2.LockType;
|
import org.apache.hadoop.hbase.procedure2.LockType;
|
||||||
import org.apache.hadoop.hbase.procedure2.LockedResource;
|
import org.apache.hadoop.hbase.procedure2.LockedResource;
|
||||||
import org.apache.hadoop.hbase.procedure2.LockedResourceType;
|
import org.apache.hadoop.hbase.procedure2.LockedResourceType;
|
||||||
|
@ -839,29 +843,29 @@ public class TestMasterProcedureScheduler {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class TestRegionProcedure extends TestTableProcedure {
|
public static class TestRegionProcedure extends TestTableProcedure {
|
||||||
private final HRegionInfo[] regionInfo;
|
private final RegionInfo[] regionInfo;
|
||||||
|
|
||||||
public TestRegionProcedure() {
|
public TestRegionProcedure() {
|
||||||
throw new UnsupportedOperationException("recovery should not be triggered here");
|
throw new UnsupportedOperationException("recovery should not be triggered here");
|
||||||
}
|
}
|
||||||
|
|
||||||
public TestRegionProcedure(long procId, TableName tableName, TableOperationType opType,
|
public TestRegionProcedure(long procId, TableName tableName, TableOperationType opType,
|
||||||
HRegionInfo... regionInfo) {
|
RegionInfo... regionInfo) {
|
||||||
this(-1, procId, tableName, opType, regionInfo);
|
this(-1, procId, tableName, opType, regionInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TestRegionProcedure(long parentProcId, long procId, TableName tableName,
|
public TestRegionProcedure(long parentProcId, long procId, TableName tableName,
|
||||||
TableOperationType opType, HRegionInfo... regionInfo) {
|
TableOperationType opType, RegionInfo... regionInfo) {
|
||||||
this(-1, parentProcId, procId, tableName, opType, regionInfo);
|
this(-1, parentProcId, procId, tableName, opType, regionInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TestRegionProcedure(long rootProcId, long parentProcId, long procId, TableName tableName,
|
public TestRegionProcedure(long rootProcId, long parentProcId, long procId, TableName tableName,
|
||||||
TableOperationType opType, HRegionInfo... regionInfo) {
|
TableOperationType opType, RegionInfo... regionInfo) {
|
||||||
super(rootProcId, parentProcId, procId, tableName, opType);
|
super(rootProcId, parentProcId, procId, tableName, opType);
|
||||||
this.regionInfo = regionInfo;
|
this.regionInfo = regionInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
public HRegionInfo[] getRegionInfo() {
|
public RegionInfo[] getRegionInfo() {
|
||||||
return regionInfo;
|
return regionInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1114,5 +1118,30 @@ public class TestMasterProcedureScheduler {
|
||||||
assertEquals(LockType.EXCLUSIVE, waitingProcedure3.getType());
|
assertEquals(LockType.EXCLUSIVE, waitingProcedure3.getType());
|
||||||
assertEquals(procedure3, waitingProcedure3);
|
assertEquals(procedure3, waitingProcedure3);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAcquireSharedLockWhileParentHoldingExclusiveLock() {
|
||||||
|
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||||
|
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
|
||||||
|
|
||||||
|
TestTableProcedure parentProc = new TestTableProcedure(1, tableName, TableOperationType.EDIT);
|
||||||
|
TestRegionProcedure proc =
|
||||||
|
new TestRegionProcedure(1, 2, tableName, TableOperationType.REGION_EDIT, regionInfo);
|
||||||
|
queue.addBack(parentProc);
|
||||||
|
|
||||||
|
assertSame(parentProc, queue.poll());
|
||||||
|
assertFalse(queue.waitTableExclusiveLock(parentProc, tableName));
|
||||||
|
|
||||||
|
// The queue for this table should be added back to run queue as the parent has the xlock, so we
|
||||||
|
// can poll it out.
|
||||||
|
queue.addBack(proc);
|
||||||
|
assertSame(proc, queue.poll());
|
||||||
|
// the parent has xlock on the table, and it is OK for us to acquire shared lock on the table,
|
||||||
|
// this is what this test wants to confirm
|
||||||
|
assertFalse(queue.waitRegion(proc, regionInfo));
|
||||||
|
|
||||||
|
queue.wakeRegion(proc, regionInfo);
|
||||||
|
queue.wakeTableExclusiveLock(parentProc, tableName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue