HBASE-17068 Procedure v2 - inherit region locks (Matteo Bertozzi)
This commit is contained in:
parent
319ecd867a
commit
306ef83c9c
|
@ -412,15 +412,27 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
return exclusiveLockProcIdOwner == procId;
|
||||
}
|
||||
|
||||
public boolean tryExclusiveLock(final long procIdOwner) {
|
||||
assert procIdOwner != Long.MIN_VALUE;
|
||||
if (hasExclusiveLock() && !isLockOwner(procIdOwner)) return false;
|
||||
exclusiveLockProcIdOwner = procIdOwner;
|
||||
public boolean hasParentLock(final Procedure proc) {
|
||||
return proc.hasParent() &&
|
||||
(isLockOwner(proc.getParentProcId()) || isLockOwner(proc.getRootProcId()));
|
||||
}
|
||||
|
||||
public boolean hasLockAccess(final Procedure proc) {
|
||||
return isLockOwner(proc.getProcId()) || hasParentLock(proc);
|
||||
}
|
||||
|
||||
public boolean tryExclusiveLock(final Procedure proc) {
|
||||
if (hasExclusiveLock()) return hasLockAccess(proc);
|
||||
exclusiveLockProcIdOwner = proc.getProcId();
|
||||
return true;
|
||||
}
|
||||
|
||||
private void releaseExclusiveLock() {
|
||||
public boolean releaseExclusiveLock(final Procedure proc) {
|
||||
if (isLockOwner(proc.getProcId())) {
|
||||
exclusiveLockProcIdOwner = Long.MIN_VALUE;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public HRegionInfo getRegionInfo() {
|
||||
|
@ -443,7 +455,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
public static class TableQueue extends QueueImpl<TableName> {
|
||||
private final NamespaceQueue namespaceQueue;
|
||||
|
||||
private HashMap<HRegionInfo, RegionEvent> regionEventMap;
|
||||
private HashMap<String, RegionEvent> regionEventMap;
|
||||
private TableLock tableLock = null;
|
||||
|
||||
public TableQueue(TableName tableName, NamespaceQueue namespaceQueue, int priority) {
|
||||
|
@ -476,18 +488,18 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
|
||||
public synchronized RegionEvent getRegionEvent(final HRegionInfo regionInfo) {
|
||||
if (regionEventMap == null) {
|
||||
regionEventMap = new HashMap<HRegionInfo, RegionEvent>();
|
||||
regionEventMap = new HashMap<String, RegionEvent>();
|
||||
}
|
||||
RegionEvent event = regionEventMap.get(regionInfo);
|
||||
RegionEvent event = regionEventMap.get(regionInfo.getEncodedName());
|
||||
if (event == null) {
|
||||
event = new RegionEvent(regionInfo);
|
||||
regionEventMap.put(regionInfo, event);
|
||||
regionEventMap.put(regionInfo.getEncodedName(), event);
|
||||
}
|
||||
return event;
|
||||
}
|
||||
|
||||
public synchronized void removeRegionEvent(final RegionEvent event) {
|
||||
regionEventMap.remove(event.getRegionInfo());
|
||||
regionEventMap.remove(event.getRegionInfo().getEncodedName());
|
||||
if (regionEventMap.isEmpty()) {
|
||||
regionEventMap = null;
|
||||
}
|
||||
|
@ -675,7 +687,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
hasXLock = queue.tryZkExclusiveLock(lockManager, procedure.toString());
|
||||
if (!hasXLock) {
|
||||
schedLock();
|
||||
if (!hasParentLock) queue.releaseExclusiveLock();
|
||||
if (!hasParentLock) queue.releaseExclusiveLock(procedure);
|
||||
queue.getNamespaceQueue().releaseSharedLock();
|
||||
addToRunQueue(tableRunQueue, queue);
|
||||
schedUnlock();
|
||||
|
@ -699,7 +711,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
}
|
||||
|
||||
schedLock();
|
||||
if (!hasParentLock) queue.releaseExclusiveLock();
|
||||
if (!hasParentLock) queue.releaseExclusiveLock(procedure);
|
||||
queue.getNamespaceQueue().releaseSharedLock();
|
||||
addToRunQueue(tableRunQueue, queue);
|
||||
schedUnlock();
|
||||
|
@ -846,11 +858,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
assert i == 0 || regionInfo[i] != regionInfo[i-1] : "duplicate region: " + regionInfo[i];
|
||||
|
||||
event[i] = queue.getRegionEvent(regionInfo[i]);
|
||||
if (!event[i].tryExclusiveLock(procedure.getProcId())) {
|
||||
if (!event[i].tryExclusiveLock(procedure)) {
|
||||
suspendProcedure(event[i], procedure);
|
||||
hasLock = false;
|
||||
while (i-- > 0) {
|
||||
event[i].releaseExclusiveLock();
|
||||
event[i].releaseExclusiveLock(procedure);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -892,7 +904,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
assert i == 0 || regionInfo[i] != regionInfo[i-1] : "duplicate region: " + regionInfo[i];
|
||||
|
||||
RegionEvent event = queue.getRegionEvent(regionInfo[i]);
|
||||
event.releaseExclusiveLock();
|
||||
if (event.releaseExclusiveLock(procedure)) {
|
||||
if (event.hasWaitingProcedures()) {
|
||||
// release one procedure at the time since regions has an xlock
|
||||
nextProcs[numProcs++] = event.popWaitingProcedure(true);
|
||||
|
@ -901,6 +913,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// awake procedures if any
|
||||
schedLock();
|
||||
|
@ -960,7 +973,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
final TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME);
|
||||
final NamespaceQueue queue = getNamespaceQueue(nsName);
|
||||
|
||||
queue.releaseExclusiveLock();
|
||||
queue.releaseExclusiveLock(procedure);
|
||||
if (tableQueue.releaseSharedLock()) {
|
||||
addToRunQueue(tableRunQueue, tableQueue);
|
||||
}
|
||||
|
@ -1005,7 +1018,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
schedLock();
|
||||
try {
|
||||
ServerQueue queue = getServerQueue(serverName);
|
||||
queue.releaseExclusiveLock();
|
||||
queue.releaseExclusiveLock(procedure);
|
||||
addToRunQueue(serverRunQueue, queue);
|
||||
} finally {
|
||||
schedUnlock();
|
||||
|
@ -1135,8 +1148,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
|||
return true;
|
||||
}
|
||||
|
||||
public synchronized void releaseExclusiveLock() {
|
||||
public synchronized boolean releaseExclusiveLock(final Procedure proc) {
|
||||
if (isLockOwner(proc.getProcId())) {
|
||||
exclusiveLockProcIdOwner = Long.MIN_VALUE;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// This should go away when we have the new AM and its events
|
||||
|
|
|
@ -542,6 +542,49 @@ public class TestMasterProcedureScheduler {
|
|||
queue.releaseTableExclusiveLock(rootProc, tableName);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInheritedRegionXLock() {
|
||||
final TableName tableName = TableName.valueOf("testInheritedRegionXLock");
|
||||
final HRegionInfo region = new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b"));
|
||||
|
||||
queue.addBack(new TestRegionProcedure(1, tableName,
|
||||
TableProcedureInterface.TableOperationType.SPLIT, region));
|
||||
queue.addBack(new TestRegionProcedure(1, 2, tableName,
|
||||
TableProcedureInterface.TableOperationType.UNASSIGN, region));
|
||||
queue.addBack(new TestRegionProcedure(3, tableName,
|
||||
TableProcedureInterface.TableOperationType.REGION_EDIT, region));
|
||||
|
||||
// fetch the root proc and take the lock on the region
|
||||
Procedure rootProc = queue.poll();
|
||||
assertEquals(1, rootProc.getProcId());
|
||||
assertEquals(false, queue.waitRegion(rootProc, region));
|
||||
|
||||
// fetch the sub-proc and take the lock on the region (inherited lock)
|
||||
Procedure childProc = queue.poll();
|
||||
assertEquals(2, childProc.getProcId());
|
||||
assertEquals(false, queue.waitRegion(childProc, region));
|
||||
|
||||
// proc-3 will be fetched but it can't take the lock
|
||||
Procedure proc = queue.poll();
|
||||
assertEquals(3, proc.getProcId());
|
||||
assertEquals(true, queue.waitRegion(proc, region));
|
||||
|
||||
// release the child lock
|
||||
queue.wakeRegion(childProc, region);
|
||||
|
||||
// nothing in the queue (proc-3 is suspended)
|
||||
assertEquals(null, queue.poll(0));
|
||||
|
||||
// release the root lock
|
||||
queue.wakeRegion(rootProc, region);
|
||||
|
||||
// proc-3 should be now available
|
||||
proc = queue.poll();
|
||||
assertEquals(3, proc.getProcId());
|
||||
assertEquals(false, queue.waitRegion(proc, region));
|
||||
queue.wakeRegion(proc, region);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSuspendedProcedure() throws Exception {
|
||||
final TableName tableName = TableName.valueOf("testSuspendedProcedure");
|
||||
|
|
Loading…
Reference in New Issue