HBASE-16735 Procedure v2 - Fix yield while holding locks
This commit is contained in:
parent
a46134bffc
commit
e868d9586f
|
@ -150,23 +150,17 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
if (proc.isSuspended()) return;
|
||||
|
||||
queue.add(proc, addFront);
|
||||
if (!(queue.isSuspended() ||
|
||||
(queue.hasExclusiveLock() && !queue.isLockOwner(proc.getProcId())))) {
|
||||
// the queue is not suspended or removed from the fairq (run-queue)
|
||||
// because someone has an xlock on it.
|
||||
// so, if the queue is not-linked we should add it
|
||||
if (queue.size() == 1 && !AvlIterableList.isLinked(queue)) {
|
||||
fairq.add(queue);
|
||||
}
|
||||
if (!queue.hasExclusiveLock() || queue.isLockOwner(proc.getProcId())) {
|
||||
// if the queue was not remove for an xlock execution
|
||||
// or the proc is the lock owner, put the queue back into execution
|
||||
addToRunQueue(fairq, queue);
|
||||
} else if (queue.hasParentLock(proc)) {
|
||||
assert addFront : "expected to add a child in the front";
|
||||
assert !queue.isSuspended() : "unexpected suspended state for the queue";
|
||||
// our (proc) parent has the xlock,
|
||||
// so the queue is not in the fairq (run-queue)
|
||||
// add it back to let the child run (inherit the lock)
|
||||
if (!AvlIterableList.isLinked(queue)) {
|
||||
fairq.add(queue);
|
||||
}
|
||||
addToRunQueue(fairq, queue);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -348,16 +342,17 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
}
|
||||
|
||||
private <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue) {
|
||||
if (AvlIterableList.isLinked(queue)) return;
|
||||
if (!queue.isEmpty()) {
|
||||
if (!AvlIterableList.isLinked(queue) &&
|
||||
!queue.isEmpty() && !queue.isSuspended()) {
|
||||
fairq.add(queue);
|
||||
}
|
||||
}
|
||||
|
||||
private <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, Queue<T> queue) {
|
||||
if (!AvlIterableList.isLinked(queue)) return;
|
||||
if (AvlIterableList.isLinked(queue)) {
|
||||
fairq.remove(queue);
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// TODO: Metrics
|
||||
|
@ -924,6 +919,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
case MERGE:
|
||||
case ASSIGN:
|
||||
case UNASSIGN:
|
||||
case REGION_EDIT:
|
||||
return false;
|
||||
default:
|
||||
break;
|
||||
|
@ -1154,8 +1150,10 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
// Zk lock is expensive...
|
||||
queue.releaseZkSharedLock(lockManager);
|
||||
|
||||
queue.releaseSharedLock();
|
||||
queue.getNamespaceQueue().releaseSharedLock();
|
||||
if (queue.releaseSharedLock()) {
|
||||
addToRunQueue(tableRunQueue, queue);
|
||||
}
|
||||
schedLock.unlock();
|
||||
}
|
||||
|
||||
|
@ -1354,11 +1352,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
public void releaseNamespaceExclusiveLock(final Procedure procedure, final String nsName) {
|
||||
schedLock.lock();
|
||||
try {
|
||||
TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME);
|
||||
tableQueue.releaseSharedLock();
|
||||
final TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME);
|
||||
final NamespaceQueue queue = getNamespaceQueue(nsName);
|
||||
|
||||
NamespaceQueue queue = getNamespaceQueue(nsName);
|
||||
queue.releaseExclusiveLock();
|
||||
if (tableQueue.releaseSharedLock()) {
|
||||
addToRunQueue(tableRunQueue, tableQueue);
|
||||
}
|
||||
} finally {
|
||||
schedLock.unlock();
|
||||
}
|
||||
|
@ -1503,8 +1503,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
|
|||
return true;
|
||||
}
|
||||
|
||||
public synchronized void releaseSharedLock() {
|
||||
sharedLock--;
|
||||
public synchronized boolean releaseSharedLock() {
|
||||
return --sharedLock == 0;
|
||||
}
|
||||
|
||||
protected synchronized boolean isSingleSharedLock() {
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
|
|||
public interface TableProcedureInterface {
|
||||
public enum TableOperationType {
|
||||
CREATE, DELETE, DISABLE, EDIT, ENABLE, READ,
|
||||
SPLIT, MERGE, ASSIGN, UNASSIGN, /* region operations */
|
||||
REGION_EDIT, SPLIT, MERGE, ASSIGN, UNASSIGN, /* region operations */
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
|
@ -501,11 +501,11 @@ public class TestMasterProcedureScheduler {
|
|||
// (this step is done by the executor/rootProc, we are simulating it)
|
||||
Procedure[] subProcs = new Procedure[] {
|
||||
new TestRegionProcedure(1, 2, tableName,
|
||||
TableProcedureInterface.TableOperationType.ASSIGN, regionA),
|
||||
TableProcedureInterface.TableOperationType.REGION_EDIT, regionA),
|
||||
new TestRegionProcedure(1, 3, tableName,
|
||||
TableProcedureInterface.TableOperationType.ASSIGN, regionB),
|
||||
TableProcedureInterface.TableOperationType.REGION_EDIT, regionB),
|
||||
new TestRegionProcedure(1, 4, tableName,
|
||||
TableProcedureInterface.TableOperationType.ASSIGN, regionC),
|
||||
TableProcedureInterface.TableOperationType.REGION_EDIT, regionC),
|
||||
};
|
||||
|
||||
// at this point the rootProc is going in a waiting state
|
||||
|
@ -678,6 +678,79 @@ public class TestMasterProcedureScheduler {
|
|||
queue.releaseTableExclusiveLock(parentProc, tableName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testYieldWithXLockHeld() throws Exception {
|
||||
final TableName tableName = TableName.valueOf("testYieldWithXLockHeld");
|
||||
|
||||
queue.addBack(new TestTableProcedure(1, tableName,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
queue.addBack(new TestTableProcedure(2, tableName,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
|
||||
// fetch from the queue and acquire xlock for the first proc
|
||||
Procedure proc = queue.poll();
|
||||
assertEquals(1, proc.getProcId());
|
||||
assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName));
|
||||
|
||||
// nothing available, until xlock release
|
||||
assertEquals(null, queue.poll(0));
|
||||
|
||||
// put the proc in the queue
|
||||
queue.yield(proc);
|
||||
|
||||
// fetch from the queue, it should be the one with just added back
|
||||
proc = queue.poll();
|
||||
assertEquals(1, proc.getProcId());
|
||||
|
||||
// release the xlock
|
||||
queue.releaseTableExclusiveLock(proc, tableName);
|
||||
|
||||
proc = queue.poll();
|
||||
assertEquals(2, proc.getProcId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testYieldWithSharedLockHeld() throws Exception {
|
||||
final TableName tableName = TableName.valueOf("testYieldWithSharedLockHeld");
|
||||
|
||||
queue.addBack(new TestTableProcedure(1, tableName,
|
||||
TableProcedureInterface.TableOperationType.READ));
|
||||
queue.addBack(new TestTableProcedure(2, tableName,
|
||||
TableProcedureInterface.TableOperationType.READ));
|
||||
queue.addBack(new TestTableProcedure(3, tableName,
|
||||
TableProcedureInterface.TableOperationType.EDIT));
|
||||
|
||||
// fetch and acquire the first shared-lock
|
||||
Procedure proc1 = queue.poll();
|
||||
assertEquals(1, proc1.getProcId());
|
||||
assertEquals(true, queue.tryAcquireTableSharedLock(proc1, tableName));
|
||||
|
||||
// fetch and acquire the second shared-lock
|
||||
Procedure proc2 = queue.poll();
|
||||
assertEquals(2, proc2.getProcId());
|
||||
assertEquals(true, queue.tryAcquireTableSharedLock(proc2, tableName));
|
||||
|
||||
// nothing available, until xlock release
|
||||
assertEquals(null, queue.poll(0));
|
||||
|
||||
// put the procs back in the queue
|
||||
queue.yield(proc2);
|
||||
queue.yield(proc1);
|
||||
|
||||
// fetch from the queue, it should fetch the ones with just added back
|
||||
proc1 = queue.poll();
|
||||
assertEquals(1, proc1.getProcId());
|
||||
proc2 = queue.poll();
|
||||
assertEquals(2, proc2.getProcId());
|
||||
|
||||
// release the xlock
|
||||
queue.releaseTableSharedLock(proc1, tableName);
|
||||
queue.releaseTableSharedLock(proc2, tableName);
|
||||
|
||||
Procedure proc3 = queue.poll();
|
||||
assertEquals(3, proc3.getProcId());
|
||||
}
|
||||
|
||||
public static class TestTableProcedure extends TestProcedure
|
||||
implements TableProcedureInterface {
|
||||
private final TableOperationType opType;
|
||||
|
@ -710,7 +783,7 @@ public class TestMasterProcedureScheduler {
|
|||
@Override
|
||||
public void toStringClassDetails(final StringBuilder sb) {
|
||||
sb.append(getClass().getSimpleName());
|
||||
sb.append(" (table=");
|
||||
sb.append("(table=");
|
||||
sb.append(getTableName());
|
||||
sb.append(")");
|
||||
}
|
||||
|
@ -754,7 +827,7 @@ public class TestMasterProcedureScheduler {
|
|||
@Override
|
||||
public void toStringClassDetails(final StringBuilder sb) {
|
||||
sb.append(getClass().getSimpleName());
|
||||
sb.append(" (region=");
|
||||
sb.append("(regions=");
|
||||
sb.append(Arrays.toString(getRegionInfo()));
|
||||
sb.append(")");
|
||||
}
|
||||
|
@ -784,5 +857,14 @@ public class TestMasterProcedureScheduler {
|
|||
public TableOperationType getTableOperationType() {
|
||||
return opType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toStringClassDetails(final StringBuilder sb) {
|
||||
sb.append(getClass().getSimpleName());
|
||||
sb.append("(ns=");
|
||||
sb.append(nsName);
|
||||
sb.append(")");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue