diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java index fbfa5b2f10d..c0361638abc 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java @@ -253,22 +253,16 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { * Wakes up given waiting procedures by pushing them back into scheduler queues. * @return size of given {@code waitQueue}. */ - protected int wakeWaitingProcedures(final ProcedureDeque waitQueue) { - int count = waitQueue.size(); - // wakeProcedure adds to the front of queue, so we start from last in the - // waitQueue' queue, so that the procedure which was added first goes in the front for - // the scheduler queue. - addFront(waitQueue.descendingIterator()); - waitQueue.clear(); - return count; + protected int wakeWaitingProcedures(LockAndQueue lockAndQueue) { + return lockAndQueue.wakeWaitingProcedures(this); } - protected void waitProcedure(final ProcedureDeque waitQueue, final Procedure proc) { - waitQueue.addLast(proc); + protected void waitProcedure(LockAndQueue lockAndQueue, final Procedure proc) { + lockAndQueue.addLast(proc); } protected void wakeProcedure(final Procedure procedure) { - if (LOG.isTraceEnabled()) LOG.trace("Wake " + procedure); + LOG.trace("Wake {}", procedure); push(procedure, /* addFront= */ true, /* notify= */false); } @@ -285,7 +279,9 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { } protected void wakePollIfNeeded(final int waitingCount) { - if (waitingCount <= 0) return; + if (waitingCount <= 0) { + return; + } if (waitingCount == 1) { schedWaitCond.signal(); } else { diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java index 427c1fc32d3..ae8daa28314 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockAndQueue.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hbase.procedure2; +import java.util.function.Predicate; +import java.util.stream.Stream; import org.apache.yetus.audience.InterfaceAudience; /** @@ -45,7 +47,8 @@ import org.apache.yetus.audience.InterfaceAudience; * We do not use ReentrantReadWriteLock directly because of its high memory overhead. */ @InterfaceAudience.Private -public class LockAndQueue extends ProcedureDeque implements LockStatus { +public class LockAndQueue implements LockStatus { + private final ProcedureDeque queue = new ProcedureDeque(); private Procedure exclusiveLockOwnerProcedure = null; private int sharedLock = 0; @@ -69,12 +72,14 @@ public class LockAndQueue extends ProcedureDeque implements LockStatus { } @Override - public boolean hasParentLock(final Procedure proc) { - return proc.hasParent() && (isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId())); + public boolean hasParentLock(Procedure proc) { + // TODO: need to check all the ancestors + return proc.hasParent() && + (isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId())); } @Override - public boolean hasLockAccess(final Procedure proc) { + public boolean hasLockAccess(Procedure proc) { return isLockOwner(proc.getProcId()) || hasParentLock(proc); } @@ -101,37 +106,87 @@ public class LockAndQueue extends ProcedureDeque implements LockStatus { // try/release Shared/Exclusive lock // ====================================================================== - public boolean trySharedLock() { - if (hasExclusiveLock()) return false; + /** + * @return whether we have succesfully acquired the shared lock. + */ + public boolean trySharedLock(Procedure proc) { + if (hasExclusiveLock() && !hasLockAccess(proc)) { + return false; + } + // If no one holds the xlock, then we are free to hold the sharedLock + // If the parent proc or we have already held the xlock, then we return true here as + // xlock is more powerful then shared lock. sharedLock++; return true; } + /** + * @return whether we should wake the procedures waiting on the lock here. + */ public boolean releaseSharedLock() { - return --sharedLock == 0; + // hasExclusiveLock could be true, it usually means we acquire shared lock while we or our + // parent have held the xlock. And since there is still an exclusive lock, we do not need to + // wake any procedures. + return --sharedLock == 0 && !hasExclusiveLock(); } - public boolean tryExclusiveLock(final Procedure proc) { - if (isLocked()) return hasLockAccess(proc); + public boolean tryExclusiveLock(Procedure proc) { + if (isLocked()) { + return hasLockAccess(proc); + } exclusiveLockOwnerProcedure = proc; return true; } /** - * @return True if we released a lock. + * @return whether we should wake the procedures waiting on the lock here. */ - public boolean releaseExclusiveLock(final Procedure proc) { - if (isLockOwner(proc.getProcId())) { - exclusiveLockOwnerProcedure = null; - return true; + public boolean releaseExclusiveLock(Procedure proc) { + if (!isLockOwner(proc.getProcId())) { + // We are not the lock owner, it is probably inherited from the parent procedures. + return false; } - return false; + exclusiveLockOwnerProcedure = null; + // This maybe a bit strange so let me explain. We allow acquiring shared lock while the parent + // proc or we have already held the xlock, and also allow releasing the locks in any order, so + // it could happen that the xlock is released but there are still some procs holding the shared + // lock. + // In HBase, this could happen when a proc which holdLock is false and schedules sub procs which + // acquire the shared lock on the same lock. This is because we will schedule the sub proces + // before releasing the lock, so the sub procs could call acquire lock before we releasing the + // xlock. + return sharedLock == 0; + } + + public boolean isWaitingQueueEmpty() { + return queue.isEmpty(); + } + + public Procedure removeFirst() { + return queue.removeFirst(); + } + + public void addLast(Procedure proc) { + queue.addLast(proc); + } + + public int wakeWaitingProcedures(ProcedureScheduler scheduler) { + int count = queue.size(); + // wakeProcedure adds to the front of queue, so we start from last in the waitQueue' queue, so + // that the procedure which was added first goes in the front for the scheduler queue. + scheduler.addFront(queue.descendingIterator()); + queue.clear(); + return count; + } + + @SuppressWarnings("rawtypes") + public Stream filterWaitingQueue(Predicate predicate) { + return queue.stream().filter(predicate); } @Override public String toString() { - return "exclusiveLockOwner=" + (hasExclusiveLock()? getExclusiveLockProcIdOwner(): "NONE") + - ", sharedLockCount=" + getSharedLockCount() + - ", waitingProcCount=" + size(); + return "exclusiveLockOwner=" + (hasExclusiveLock() ? getExclusiveLockProcIdOwner() : "NONE") + + ", sharedLockCount=" + getSharedLockCount() + ", waitingProcCount=" + queue.size(); } } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java index b0c8d2956b1..031b8bb54ae 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockStatus.java @@ -33,9 +33,9 @@ public interface LockStatus { boolean isLockOwner(long procId); - boolean hasParentLock(final Procedure proc); + boolean hasParentLock(Procedure proc); - boolean hasLockAccess(final Procedure proc); + boolean hasLockAccess(Procedure proc); Procedure getExclusiveLockOwnerProcedure(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 4bd3a41a2d8..096979beb58 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -531,7 +531,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { final String namespace = table.getNamespaceAsString(); final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); final LockAndQueue tableLock = locking.getTableLock(table); - if (!namespaceLock.trySharedLock()) { + if (!namespaceLock.trySharedLock(procedure)) { waitProcedure(namespaceLock, procedure); logLockedResource(LockedResourceType.NAMESPACE, namespace); return true; @@ -560,9 +560,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); final LockAndQueue tableLock = locking.getTableLock(table); int waitingCount = 0; - - if (!tableLock.hasParentLock(procedure)) { - tableLock.releaseExclusiveLock(procedure); + if (tableLock.releaseExclusiveLock(procedure)) { waitingCount += wakeWaitingProcedures(tableLock); } if (namespaceLock.releaseSharedLock()) { @@ -591,12 +589,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { try { final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); final LockAndQueue tableLock = locking.getTableLock(table); - if (!namespaceLock.trySharedLock()) { + if (!namespaceLock.trySharedLock(procedure)) { waitProcedure(namespaceLock, procedure); return null; } - if (!tableLock.trySharedLock()) { + if (!tableLock.trySharedLock(procedure)) { namespaceLock.releaseSharedLock(); waitProcedure(tableLock, procedure); return null; @@ -690,11 +688,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { Arrays.sort(regionInfo, RegionInfo.COMPARATOR); schedLock(); try { - // If there is parent procedure, it would have already taken xlock, so no need to take - // shared lock here. Otherwise, take shared lock. - if (!procedure.hasParent() - && waitTableQueueSharedLock(procedure, table) == null) { - return true; + assert table != null; + if (waitTableSharedLock(procedure, table)) { + return true; } // acquire region xlocks or wait @@ -702,7 +698,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { final LockAndQueue[] regionLocks = new LockAndQueue[regionInfo.length]; for (int i = 0; i < regionInfo.length; ++i) { LOG.info("{} checking lock on {}", procedure, regionInfo[i].getEncodedName()); - assert table != null; assert regionInfo[i] != null; assert regionInfo[i].getTable() != null; assert regionInfo[i].getTable().equals(table): regionInfo[i] + " " + procedure; @@ -719,7 +714,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } } - if (!hasLock && !procedure.hasParent()) { + if (!hasLock) { wakeTableSharedLock(procedure, table); } return !hasLock; @@ -748,14 +743,14 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { schedLock(); try { int numProcs = 0; - final Procedure[] nextProcs = new Procedure[regionInfo.length]; + final Procedure[] nextProcs = new Procedure[regionInfo.length]; for (int i = 0; i < regionInfo.length; ++i) { assert regionInfo[i].getTable().equals(table); assert i == 0 || regionInfo[i] != regionInfo[i - 1] : "duplicate region: " + regionInfo[i]; LockAndQueue regionLock = locking.getRegionLock(regionInfo[i].getEncodedName()); if (regionLock.releaseExclusiveLock(procedure)) { - if (!regionLock.isEmpty()) { + if (!regionLock.isWaitingQueueEmpty()) { // release one procedure at the time since regions has an xlock nextProcs[numProcs++] = regionLock.removeFirst(); } else { @@ -769,11 +764,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { wakeProcedure(nextProcs[i]); } wakePollIfNeeded(numProcs); - if (!procedure.hasParent()) { - // release the table shared-lock. - // (if we have a parent, it is holding an xlock so we didn't take the shared-lock) - wakeTableSharedLock(procedure, table); - } + // release the table shared-lock. + wakeTableSharedLock(procedure, table); } finally { schedUnlock(); } @@ -789,15 +781,15 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param namespace Namespace to lock * @return true if the procedure has to wait for the namespace to be available */ - public boolean waitNamespaceExclusiveLock(final Procedure procedure, final String namespace) { + public boolean waitNamespaceExclusiveLock(Procedure procedure, String namespace) { schedLock(); try { final LockAndQueue systemNamespaceTableLock = - locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); - if (!systemNamespaceTableLock.trySharedLock()) { + locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); + if (!systemNamespaceTableLock.trySharedLock(procedure)) { waitProcedure(systemNamespaceTableLock, procedure); logLockedResource(LockedResourceType.TABLE, - TableName.NAMESPACE_TABLE_NAME.getNameAsString()); + TableName.NAMESPACE_TABLE_NAME.getNameAsString()); return true; } @@ -826,13 +818,14 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace); final LockAndQueue systemNamespaceTableLock = locking.getTableLock(TableName.NAMESPACE_TABLE_NAME); - namespaceLock.releaseExclusiveLock(procedure); int waitingCount = 0; + if (namespaceLock.releaseExclusiveLock(procedure)) { + waitingCount += wakeWaitingProcedures(namespaceLock); + } if (systemNamespaceTableLock.releaseSharedLock()) { addToRunQueue(tableRunQueue, getTableQueue(TableName.NAMESPACE_TABLE_NAME)); waitingCount += wakeWaitingProcedures(systemNamespaceTableLock); } - waitingCount += wakeWaitingProcedures(namespaceLock); wakePollIfNeeded(waitingCount); } finally { schedUnlock(); @@ -878,6 +871,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { schedLock(); try { final LockAndQueue lock = locking.getServerLock(serverName); + // Only SCP will acquire/release server lock so do not need to check the return value here. lock.releaseExclusiveLock(procedure); // We do not need to create a new queue so just pass null, as in tests we may pass procedures // other than ServerProcedureInterface @@ -929,10 +923,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { schedLock(); try { final LockAndQueue lock = locking.getPeerLock(peerId); - lock.releaseExclusiveLock(procedure); - addToRunQueue(peerRunQueue, getPeerQueue(peerId)); - int waitingCount = wakeWaitingProcedures(lock); - wakePollIfNeeded(waitingCount); + if (lock.releaseExclusiveLock(procedure)) { + addToRunQueue(peerRunQueue, getPeerQueue(peerId)); + int waitingCount = wakeWaitingProcedures(lock); + wakePollIfNeeded(waitingCount); + } } finally { schedUnlock(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java index de1d0ecd9ee..f2bcf91b54b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java @@ -125,13 +125,8 @@ class SchemaLocking { List> waitingProcedures = new ArrayList<>(); - for (Procedure procedure : queue) { - if (!(procedure instanceof LockProcedure)) { - continue; - } - - waitingProcedures.add(procedure); - } + queue.filterWaitingQueue(p -> p instanceof LockProcedure) + .forEachOrdered(waitingProcedures::add); return new LockedResource(resourceType, resourceName, lockType, exclusiveLockOwnerProcedure, sharedLockCount, waitingProcedures); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java index f1293f13516..6fc59be2955 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java @@ -95,9 +95,9 @@ public class TestAdmin2 { TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6); - TEST_UTIL.getConfiguration().setInt("hbase.regionserver.metahandler.count", 30); - TEST_UTIL.getConfiguration().setBoolean( - "hbase.master.enabletable.roundrobin", true); + TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 30); + TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 30); + TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true); TEST_UTIL.startMiniCluster(3); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java index 65757db3952..e9e81e673ab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.master.procedure; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -31,7 +32,10 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.master.locking.LockProcedure; +import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; import org.apache.hadoop.hbase.procedure2.LockType; import org.apache.hadoop.hbase.procedure2.LockedResource; import org.apache.hadoop.hbase.procedure2.LockedResourceType; @@ -839,29 +843,29 @@ public class TestMasterProcedureScheduler { } public static class TestRegionProcedure extends TestTableProcedure { - private final HRegionInfo[] regionInfo; + private final RegionInfo[] regionInfo; public TestRegionProcedure() { throw new UnsupportedOperationException("recovery should not be triggered here"); } public TestRegionProcedure(long procId, TableName tableName, TableOperationType opType, - HRegionInfo... regionInfo) { + RegionInfo... regionInfo) { this(-1, procId, tableName, opType, regionInfo); } public TestRegionProcedure(long parentProcId, long procId, TableName tableName, - TableOperationType opType, HRegionInfo... regionInfo) { + TableOperationType opType, RegionInfo... regionInfo) { this(-1, parentProcId, procId, tableName, opType, regionInfo); } public TestRegionProcedure(long rootProcId, long parentProcId, long procId, TableName tableName, - TableOperationType opType, HRegionInfo... regionInfo) { + TableOperationType opType, RegionInfo... regionInfo) { super(rootProcId, parentProcId, procId, tableName, opType); this.regionInfo = regionInfo; } - public HRegionInfo[] getRegionInfo() { + public RegionInfo[] getRegionInfo() { return regionInfo; } @@ -1114,5 +1118,30 @@ public class TestMasterProcedureScheduler { assertEquals(LockType.EXCLUSIVE, waitingProcedure3.getType()); assertEquals(procedure3, waitingProcedure3); } + + @Test + public void testAcquireSharedLockWhileParentHoldingExclusiveLock() { + TableName tableName = TableName.valueOf(name.getMethodName()); + RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build(); + + TestTableProcedure parentProc = new TestTableProcedure(1, tableName, TableOperationType.EDIT); + TestRegionProcedure proc = + new TestRegionProcedure(1, 2, tableName, TableOperationType.REGION_EDIT, regionInfo); + queue.addBack(parentProc); + + assertSame(parentProc, queue.poll()); + assertFalse(queue.waitTableExclusiveLock(parentProc, tableName)); + + // The queue for this table should be added back to run queue as the parent has the xlock, so we + // can poll it out. + queue.addBack(proc); + assertSame(proc, queue.poll()); + // the parent has xlock on the table, and it is OK for us to acquire shared lock on the table, + // this is what this test wants to confirm + assertFalse(queue.waitRegion(proc, regionInfo)); + + queue.wakeRegion(proc, regionInfo); + queue.wakeTableExclusiveLock(parentProc, tableName); + } }