HBASE-27392 Add a new procedure type for implementing some global operations such as migration (#4803)

Signed-off-by: Xin Sun <ddupgs@gmail.com>
This commit is contained in:
Duo Zhang 2022-09-29 10:08:02 +08:00 committed by Duo Zhang
parent 7044150545
commit c01c8e45b4
6 changed files with 246 additions and 6 deletions

View File

@ -26,5 +26,6 @@ public enum LockedResourceType {
TABLE,
REGION,
PEER,
META
META,
GLOBAL
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Procedure interface for global operations, such as migration.
*/
@InterfaceAudience.Private
public interface GlobalProcedureInterface {
String getGlobalId();
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import org.apache.hadoop.hbase.procedure2.LockStatus;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class GlobalQueue extends Queue<String> {
public GlobalQueue(String globalId, LockStatus lockStatus) {
super(globalId, lockStatus);
}
@Override
boolean requireExclusiveLock(Procedure<?> proc) {
return true;
}
}

View File

@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
@ -95,16 +96,20 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
(n, k) -> n.compareKey((String) k);
private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
(n, k) -> n.compareKey((TableName) k);
private final static AvlKeyComparator<GlobalQueue> GLOBAL_QUEUE_KEY_COMPARATOR =
(n, k) -> n.compareKey((String) k);
private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
private final FairQueue<String> peerRunQueue = new FairQueue<>();
private final FairQueue<TableName> metaRunQueue = new FairQueue<>();
private final FairQueue<String> globalRunQueue = new FairQueue<>();
private final ServerQueue[] serverBuckets = new ServerQueue[128];
private TableQueue tableMap = null;
private PeerQueue peerMap = null;
private MetaQueue metaMap = null;
private GlobalQueue globalMap = null;
private final SchemaLocking locking;
@ -128,6 +133,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
} else if (isPeerProcedure(proc)) {
doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
} else if (isGlobalProcedure(proc)) {
doAdd(globalRunQueue, getGlobalQueue(getGlobalId(proc)), proc, addFront);
} else {
// TODO: at the moment we only have Table and Server procedures
// if you are implementing a non-table/non-server procedure, you have two options: create
@ -163,14 +170,19 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
@Override
protected boolean queueHasRunnables() {
return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables()
|| serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables();
return globalRunQueue.hasRunnables() || metaRunQueue.hasRunnables()
|| tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables()
|| peerRunQueue.hasRunnables();
}
@Override
protected Procedure dequeue() {
// meta procedure is always the first priority
Procedure<?> pollResult = doPoll(metaRunQueue);
// pull global first
Procedure<?> pollResult = doPoll(globalRunQueue);
// then meta procedure
if (pollResult == null) {
pollResult = doPoll(metaRunQueue);
}
// For now, let server handling have precedence over table handling; presumption is that it
// is more important handling crashed servers than it is running the
// enabling/disabling tables, etc.
@ -268,6 +280,14 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR);
peerMap = null;
// Remove Meta
clear(metaMap, metaRunQueue, META_QUEUE_KEY_COMPARATOR);
metaMap = null;
// Remove Global
clear(globalMap, globalRunQueue, GLOBAL_QUEUE_KEY_COMPARATOR);
globalMap = null;
assert size() == 0 : "expected queue size to be 0, got " + size();
}
@ -300,6 +320,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
count += queueSize(tableMap);
count += queueSize(peerMap);
count += queueSize(metaMap);
count += queueSize(globalMap);
return count;
}
@ -502,6 +523,51 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
return proc instanceof MetaProcedureInterface;
}
// ============================================================================
// Global Queue Lookup Helpers
// ============================================================================
private GlobalQueue getGlobalQueue(String globalId) {
GlobalQueue node = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
if (node != null) {
return node;
}
node = new GlobalQueue(globalId, locking.getGlobalLock(globalId));
globalMap = AvlTree.insert(globalMap, node);
return node;
}
private void removeGlobalQueue(String globalId) {
globalMap = AvlTree.remove(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
locking.removeGlobalLock(globalId);
}
private void tryCleanupGlobalQueue(String globalId, Procedure<?> procedure) {
schedLock();
try {
GlobalQueue queue = AvlTree.get(globalMap, globalId, GLOBAL_QUEUE_KEY_COMPARATOR);
if (queue == null) {
return;
}
final LockAndQueue lock = locking.getGlobalLock(globalId);
if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
removeFromRunQueue(globalRunQueue, queue,
() -> "clean up global queue after " + procedure + " completed");
removeGlobalQueue(globalId);
}
} finally {
schedUnlock();
}
}
private static boolean isGlobalProcedure(Procedure<?> proc) {
return proc instanceof GlobalProcedureInterface;
}
private static String getGlobalId(Procedure<?> proc) {
return ((GlobalProcedureInterface) proc).getGlobalId();
}
// ============================================================================
// Table Locking Helpers
// ============================================================================
@ -1006,6 +1072,51 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
}
}
// ============================================================================
// Global Locking Helpers
// ============================================================================
/**
* Try to acquire the share lock on global.
* @see #wakeGlobalExclusiveLock(Procedure, String)
* @param procedure the procedure trying to acquire the lock
* @return true if the procedure has to wait for global to be available
*/
public boolean waitGlobalExclusiveLock(Procedure<?> procedure, String globalId) {
schedLock();
try {
final LockAndQueue lock = locking.getGlobalLock(globalId);
if (lock.tryExclusiveLock(procedure)) {
removeFromRunQueue(globalRunQueue, getGlobalQueue(globalId),
() -> procedure + " held shared lock");
return false;
}
waitProcedure(lock, procedure);
logLockedResource(LockedResourceType.GLOBAL, HConstants.EMPTY_STRING);
return true;
} finally {
schedUnlock();
}
}
/**
* Wake the procedures waiting for global.
* @see #waitGlobalExclusiveLock(Procedure, String)
* @param procedure the procedure releasing the lock
*/
public void wakeGlobalExclusiveLock(Procedure<?> procedure, String globalId) {
schedLock();
try {
final LockAndQueue lock = locking.getGlobalLock(globalId);
lock.releaseExclusiveLock(procedure);
addToRunQueue(globalRunQueue, getGlobalQueue(globalId),
() -> procedure + " released shared lock");
int waitingCount = wakeWaitingProcedures(lock);
wakePollIfNeeded(waitingCount);
} finally {
schedUnlock();
}
}
/**
* For debugging. Expensive.
*/

View File

@ -53,6 +53,7 @@ class SchemaLocking {
// Single map for all regions irrespective of tables. Key is encoded region name.
private final Map<String, LockAndQueue> regionLocks = new HashMap<>();
private final Map<String, LockAndQueue> peerLocks = new HashMap<>();
private final Map<String, LockAndQueue> globalLocks = new HashMap<>();
private final LockAndQueue metaLock;
public SchemaLocking(Function<Long, Procedure<?>> procedureRetriever) {
@ -94,6 +95,10 @@ class SchemaLocking {
return metaLock;
}
LockAndQueue getGlobalLock(String globalId) {
return getLock(globalLocks, globalId);
}
LockAndQueue removeRegionLock(String encodedRegionName) {
return regionLocks.remove(encodedRegionName);
}
@ -114,6 +119,10 @@ class SchemaLocking {
return peerLocks.remove(peerId);
}
LockAndQueue removeGlobalLock(String globalId) {
return globalLocks.remove(globalId);
}
private LockedResource createLockedResource(LockedResourceType resourceType, String resourceName,
LockAndQueue queue) {
LockType lockType;
@ -164,6 +173,8 @@ class SchemaLocking {
addToLockedResources(lockedResources, peerLocks, Function.identity(), LockedResourceType.PEER);
addToLockedResources(lockedResources, ImmutableMap.of(TableName.META_TABLE_NAME, metaLock),
tn -> tn.getNameAsString(), LockedResourceType.META);
addToLockedResources(lockedResources, globalLocks, Function.identity(),
LockedResourceType.GLOBAL);
return lockedResources;
}
@ -191,6 +202,10 @@ class SchemaLocking {
break;
case META:
queue = metaLock;
break;
case GLOBAL:
queue = globalLocks.get(resourceName);
break;
default:
queue = null;
break;
@ -216,7 +231,8 @@ class SchemaLocking {
+ filterUnlocked(this.namespaceLocks) + ", tableLocks=" + filterUnlocked(this.tableLocks)
+ ", regionLocks=" + filterUnlocked(this.regionLocks) + ", peerLocks="
+ filterUnlocked(this.peerLocks) + ", metaLocks="
+ filterUnlocked(ImmutableMap.of(TableName.META_TABLE_NAME, metaLock));
+ filterUnlocked(ImmutableMap.of(TableName.META_TABLE_NAME, metaLock)) + ", globalLocks="
+ filterUnlocked(globalLocks);
}
private String filterUnlocked(Map<?, LockAndQueue> locks) {

View File

@ -940,6 +940,21 @@ public class TestMasterProcedureScheduler {
}
}
public static class TestGlobalProcedure extends TestProcedure
implements GlobalProcedureInterface {
private final String globalId;
public TestGlobalProcedure(long procId, String globalId) {
super(procId);
this.globalId = globalId;
}
@Override
public String getGlobalId() {
return globalId;
}
}
private static LockProcedure createLockProcedure(LockType lockType, long procId)
throws Exception {
LockProcedure procedure = new LockProcedure();
@ -1093,6 +1108,39 @@ public class TestMasterProcedureScheduler {
assertEquals(1, resource.getWaitingProcedures().size());
}
@Test
public void testListLocksGlobal() throws Exception {
String globalId = "1";
LockProcedure procedure = createExclusiveLockProcedure(4);
queue.waitGlobalExclusiveLock(procedure, globalId);
List<LockedResource> locks = queue.getLocks();
assertEquals(1, locks.size());
LockedResource resource = locks.get(0);
assertLockResource(resource, LockedResourceType.GLOBAL, globalId);
assertExclusiveLock(resource, procedure);
assertTrue(resource.getWaitingProcedures().isEmpty());
// Try to acquire the exclusive lock again with same procedure
assertFalse(queue.waitGlobalExclusiveLock(procedure, globalId));
// Try to acquire the exclusive lock again with new procedure
LockProcedure procedure2 = createExclusiveLockProcedure(5);
assertTrue(queue.waitGlobalExclusiveLock(procedure2, globalId));
// Same peerId, still only has 1 LockedResource
locks = queue.getLocks();
assertEquals(1, locks.size());
resource = locks.get(0);
assertLockResource(resource, LockedResourceType.GLOBAL, globalId);
// LockedResource owner still is the origin procedure
assertExclusiveLock(resource, procedure);
// The new procedure should in the waiting list
assertEquals(1, resource.getWaitingProcedures().size());
}
@Test
public void testListLocksWaiting() throws Exception {
LockProcedure procedure1 = createExclusiveLockProcedure(1);