HBASE-20000 Remove the quantum logic in FairQueue, always put high priority queue in front
This commit is contained in:
parent
d0f2d18ca7
commit
c18e7a963d
|
@ -548,6 +548,20 @@ public final class AvlUtil {
|
||||||
return head;
|
return head;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param head the head of the linked list
|
||||||
|
* @param base the node which we want to add the {@code node} before it
|
||||||
|
* @param node the node which we want to add it before the {@code base} node
|
||||||
|
*/
|
||||||
|
public static <TNode extends AvlLinkedNode> TNode prepend(TNode head, TNode base, TNode node) {
|
||||||
|
assert !isLinked(node) : node + " is already linked";
|
||||||
|
node.iterNext = base;
|
||||||
|
node.iterPrev = base.iterPrev;
|
||||||
|
base.iterPrev.iterNext = node;
|
||||||
|
base.iterPrev = node;
|
||||||
|
return head == base ? node : head;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param node the node to check
|
* @param node the node to check
|
||||||
* @return true if the node is linked to a list, false otherwise
|
* @return true if the node is linked to a list, false otherwise
|
||||||
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class FairQueue<T extends Comparable<T>> {
|
||||||
|
|
||||||
|
private Queue<T> queueHead = null;
|
||||||
|
private int size = 0;
|
||||||
|
|
||||||
|
public boolean hasRunnables() {
|
||||||
|
return size > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void add(Queue<T> queue) {
|
||||||
|
// For normal priority queue, just append it to the tail
|
||||||
|
if (queueHead == null || queue.getPriority() == 1) {
|
||||||
|
queueHead = AvlIterableList.append(queueHead, queue);
|
||||||
|
size++;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Find the one which priority is less than us
|
||||||
|
// For now only TableQueue has priority, and there are only a small number of tables which
|
||||||
|
// have higher priority so this will not be an expensive operation.
|
||||||
|
Queue<T> base = queueHead;
|
||||||
|
do {
|
||||||
|
if (base.getPriority() < queue.getPriority()) {
|
||||||
|
queueHead = AvlIterableList.prepend(queueHead, base, queue);
|
||||||
|
size++;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
base = AvlIterableList.readNext(base);
|
||||||
|
} while (base != queueHead);
|
||||||
|
// no one is lower than us, append to the tail
|
||||||
|
queueHead = AvlIterableList.append(queueHead, queue);
|
||||||
|
size++;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void remove(Queue<T> queue) {
|
||||||
|
queueHead = AvlIterableList.remove(queueHead, queue);
|
||||||
|
size--;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Queue<T> poll() {
|
||||||
|
if (queueHead == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
Queue<T> q = queueHead;
|
||||||
|
do {
|
||||||
|
if (q.isAvailable()) {
|
||||||
|
if (q.getPriority() == 1) {
|
||||||
|
// for the normal priority queue, remove it and append it to the tail
|
||||||
|
queueHead = AvlIterableList.remove(queueHead, q);
|
||||||
|
queueHead = AvlIterableList.append(queueHead, q);
|
||||||
|
}
|
||||||
|
return q;
|
||||||
|
}
|
||||||
|
q = AvlIterableList.readNext(q);
|
||||||
|
} while (q != queueHead);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -106,7 +106,7 @@ public class MasterProcedureEnv implements ConfigurationObserver {
|
||||||
public MasterProcedureEnv(final MasterServices master,
|
public MasterProcedureEnv(final MasterServices master,
|
||||||
final RSProcedureDispatcher remoteDispatcher) {
|
final RSProcedureDispatcher remoteDispatcher) {
|
||||||
this.master = master;
|
this.master = master;
|
||||||
this.procSched = new MasterProcedureScheduler(master.getConfiguration());
|
this.procSched = new MasterProcedureScheduler();
|
||||||
this.remoteDispatcher = remoteDispatcher;
|
this.remoteDispatcher = remoteDispatcher;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,38 +19,27 @@
|
||||||
package org.apache.hadoop.hbase.master.procedure;
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
|
||||||
import java.util.function.Function;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableExistsException;
|
import org.apache.hadoop.hbase.TableExistsException;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||||
import org.apache.hadoop.hbase.master.locking.LockProcedure;
|
|
||||||
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType;
|
|
||||||
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
|
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
|
||||||
import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
|
import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
|
||||||
import org.apache.hadoop.hbase.procedure2.LockAndQueue;
|
import org.apache.hadoop.hbase.procedure2.LockAndQueue;
|
||||||
import org.apache.hadoop.hbase.procedure2.LockStatus;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.LockType;
|
|
||||||
import org.apache.hadoop.hbase.procedure2.LockedResource;
|
import org.apache.hadoop.hbase.procedure2.LockedResource;
|
||||||
import org.apache.hadoop.hbase.procedure2.LockedResourceType;
|
import org.apache.hadoop.hbase.procedure2.LockedResourceType;
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureDeque;
|
|
||||||
import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
|
import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
|
||||||
import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
|
import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
|
||||||
import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode;
|
|
||||||
import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
|
import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
|
||||||
import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
|
import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -106,12 +95,12 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
||||||
public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class);
|
private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class);
|
||||||
|
|
||||||
private final static ServerQueueKeyComparator SERVER_QUEUE_KEY_COMPARATOR =
|
private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR =
|
||||||
new ServerQueueKeyComparator();
|
(n, k) -> n.compareKey((ServerName) k);
|
||||||
private final static TableQueueKeyComparator TABLE_QUEUE_KEY_COMPARATOR =
|
private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR =
|
||||||
new TableQueueKeyComparator();
|
(n, k) -> n.compareKey((TableName) k);
|
||||||
private final static PeerQueueKeyComparator PEER_QUEUE_KEY_COMPARATOR =
|
private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR =
|
||||||
new PeerQueueKeyComparator();
|
(n, k) -> n.compareKey((String) k);
|
||||||
|
|
||||||
private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
|
private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
|
||||||
private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
|
private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
|
||||||
|
@ -123,39 +112,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
|
|
||||||
private final SchemaLocking locking = new SchemaLocking();
|
private final SchemaLocking locking = new SchemaLocking();
|
||||||
|
|
||||||
/**
|
|
||||||
* Table priority is used when scheduling procedures from {@link #tableRunQueue}. A TableQueue
|
|
||||||
* with priority 2 will get its procedures scheduled at twice the rate as compared to
|
|
||||||
* TableQueue with priority 1. This should be enough to ensure system/meta get assigned out
|
|
||||||
* before user-space tables. HBASE-18109 is where we conclude what is here is good enough.
|
|
||||||
* Lets open new issue if we find it not enough.
|
|
||||||
*/
|
|
||||||
private static class TablePriorities {
|
|
||||||
final int metaTablePriority;
|
|
||||||
final int userTablePriority;
|
|
||||||
final int sysTablePriority;
|
|
||||||
|
|
||||||
TablePriorities(Configuration conf) {
|
|
||||||
metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3);
|
|
||||||
sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2);
|
|
||||||
userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
int getPriority(TableName tableName) {
|
|
||||||
if (tableName.equals(TableName.META_TABLE_NAME)) {
|
|
||||||
return metaTablePriority;
|
|
||||||
} else if (tableName.isSystemTable()) {
|
|
||||||
return sysTablePriority;
|
|
||||||
}
|
|
||||||
return userTablePriority;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
private final TablePriorities tablePriorities;
|
|
||||||
|
|
||||||
public MasterProcedureScheduler(final Configuration conf) {
|
|
||||||
tablePriorities = new TablePriorities(conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void yield(final Procedure proc) {
|
public void yield(final Procedure proc) {
|
||||||
push(proc, isTableProcedure(proc), true);
|
push(proc, isTableProcedure(proc), true);
|
||||||
|
@ -216,13 +172,13 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
return pollResult;
|
return pollResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T extends Comparable<T>> Procedure doPoll(final FairQueue<T> fairq) {
|
private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) {
|
||||||
final Queue<T> rq = fairq.poll();
|
final Queue<T> rq = fairq.poll();
|
||||||
if (rq == null || !rq.isAvailable()) {
|
if (rq == null || !rq.isAvailable()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
final Procedure pollResult = rq.peek();
|
final Procedure<?> pollResult = rq.peek();
|
||||||
if (pollResult == null) {
|
if (pollResult == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -240,7 +196,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
// if the rq is in the fairq because of runnable child
|
// if the rq is in the fairq because of runnable child
|
||||||
// check if the next procedure is still a child.
|
// check if the next procedure is still a child.
|
||||||
// if not, remove the rq from the fairq and go back to the xlock state
|
// if not, remove the rq from the fairq and go back to the xlock state
|
||||||
Procedure nextProc = rq.peek();
|
Procedure<?> nextProc = rq.peek();
|
||||||
if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult)) {
|
if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult)) {
|
||||||
removeFromRunQueue(fairq, rq);
|
removeFromRunQueue(fairq, rq);
|
||||||
}
|
}
|
||||||
|
@ -249,61 +205,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
return pollResult;
|
return pollResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
private LockedResource createLockedResource(LockedResourceType resourceType,
|
|
||||||
String resourceName, LockAndQueue queue) {
|
|
||||||
LockType lockType;
|
|
||||||
Procedure<?> exclusiveLockOwnerProcedure;
|
|
||||||
int sharedLockCount;
|
|
||||||
|
|
||||||
if (queue.hasExclusiveLock()) {
|
|
||||||
lockType = LockType.EXCLUSIVE;
|
|
||||||
exclusiveLockOwnerProcedure = queue.getExclusiveLockOwnerProcedure();
|
|
||||||
sharedLockCount = 0;
|
|
||||||
} else {
|
|
||||||
lockType = LockType.SHARED;
|
|
||||||
exclusiveLockOwnerProcedure = null;
|
|
||||||
sharedLockCount = queue.getSharedLockCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
List<Procedure<?>> waitingProcedures = new ArrayList<>();
|
|
||||||
|
|
||||||
for (Procedure<?> procedure : queue) {
|
|
||||||
if (!(procedure instanceof LockProcedure)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
waitingProcedures.add(procedure);
|
|
||||||
}
|
|
||||||
|
|
||||||
return new LockedResource(resourceType, resourceName, lockType,
|
|
||||||
exclusiveLockOwnerProcedure, sharedLockCount, waitingProcedures);
|
|
||||||
}
|
|
||||||
|
|
||||||
private <T> void addToLockedResources(List<LockedResource> lockedResources,
|
|
||||||
Map<T, LockAndQueue> locks, Function<T, String> keyTransformer,
|
|
||||||
LockedResourceType resourcesType) {
|
|
||||||
locks.entrySet().stream().filter(e -> e.getValue().isLocked())
|
|
||||||
.map(
|
|
||||||
e -> createLockedResource(resourcesType, keyTransformer.apply(e.getKey()), e.getValue()))
|
|
||||||
.forEachOrdered(lockedResources::add);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<LockedResource> getLocks() {
|
public List<LockedResource> getLocks() {
|
||||||
schedLock();
|
schedLock();
|
||||||
try {
|
try {
|
||||||
List<LockedResource> lockedResources = new ArrayList<>();
|
return locking.getLocks();
|
||||||
addToLockedResources(lockedResources, locking.serverLocks, sn -> sn.getServerName(),
|
|
||||||
LockedResourceType.SERVER);
|
|
||||||
addToLockedResources(lockedResources, locking.namespaceLocks, Function.identity(),
|
|
||||||
LockedResourceType.NAMESPACE);
|
|
||||||
addToLockedResources(lockedResources, locking.tableLocks, tn -> tn.getNameAsString(),
|
|
||||||
LockedResourceType.TABLE);
|
|
||||||
addToLockedResources(lockedResources, locking.regionLocks, Function.identity(),
|
|
||||||
LockedResourceType.REGION);
|
|
||||||
addToLockedResources(lockedResources, locking.peerLocks, Function.identity(),
|
|
||||||
LockedResourceType.PEER);
|
|
||||||
return lockedResources;
|
|
||||||
} finally {
|
} finally {
|
||||||
schedUnlock();
|
schedUnlock();
|
||||||
}
|
}
|
||||||
|
@ -311,27 +217,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) {
|
public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) {
|
||||||
LockAndQueue queue = null;
|
|
||||||
schedLock();
|
schedLock();
|
||||||
try {
|
try {
|
||||||
switch (resourceType) {
|
return locking.getLockResource(resourceType, resourceName);
|
||||||
case SERVER:
|
|
||||||
queue = locking.serverLocks.get(ServerName.valueOf(resourceName));
|
|
||||||
break;
|
|
||||||
case NAMESPACE:
|
|
||||||
queue = locking.namespaceLocks.get(resourceName);
|
|
||||||
break;
|
|
||||||
case TABLE:
|
|
||||||
queue = locking.tableLocks.get(TableName.valueOf(resourceName));
|
|
||||||
break;
|
|
||||||
case REGION:
|
|
||||||
queue = locking.regionLocks.get(resourceName);
|
|
||||||
break;
|
|
||||||
case PEER:
|
|
||||||
queue = locking.peerLocks.get(resourceName);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
return queue != null ? createLockedResource(resourceType, resourceName, queue) : null;
|
|
||||||
} finally {
|
} finally {
|
||||||
schedUnlock();
|
schedUnlock();
|
||||||
}
|
}
|
||||||
|
@ -348,7 +236,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void clearQueue() {
|
private void clearQueue() {
|
||||||
// Remove Servers
|
// Remove Servers
|
||||||
for (int i = 0; i < serverBuckets.length; ++i) {
|
for (int i = 0; i < serverBuckets.length; ++i) {
|
||||||
clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
|
clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
|
||||||
|
@ -450,7 +338,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
|
TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
|
||||||
if (node != null) return node;
|
if (node != null) return node;
|
||||||
|
|
||||||
node = new TableQueue(tableName, tablePriorities.getPriority(tableName),
|
node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName),
|
||||||
locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString()));
|
locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString()));
|
||||||
tableMap = AvlTree.insert(tableMap, node);
|
tableMap = AvlTree.insert(tableMap, node);
|
||||||
return node;
|
return node;
|
||||||
|
@ -512,7 +400,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
locking.removePeerLock(peerId);
|
locking.removePeerLock(peerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void tryCleanupPeerQueue(String peerId, Procedure procedure) {
|
private void tryCleanupPeerQueue(String peerId, Procedure<?> procedure) {
|
||||||
schedLock();
|
schedLock();
|
||||||
try {
|
try {
|
||||||
PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
|
PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
|
||||||
|
@ -538,148 +426,13 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
return ((PeerProcedureInterface) proc).getPeerId();
|
return ((PeerProcedureInterface) proc).getPeerId();
|
||||||
}
|
}
|
||||||
|
|
||||||
// ============================================================================
|
|
||||||
// Table and Server Queue Implementation
|
|
||||||
// ============================================================================
|
|
||||||
private static class ServerQueueKeyComparator implements AvlKeyComparator<ServerQueue> {
|
|
||||||
@Override
|
|
||||||
public int compareKey(ServerQueue node, Object key) {
|
|
||||||
return node.compareKey((ServerName)key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class ServerQueue extends Queue<ServerName> {
|
|
||||||
public ServerQueue(ServerName serverName, LockStatus serverLock) {
|
|
||||||
super(serverName, serverLock);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean requireExclusiveLock(Procedure proc) {
|
|
||||||
ServerProcedureInterface spi = (ServerProcedureInterface)proc;
|
|
||||||
switch (spi.getServerOperationType()) {
|
|
||||||
case CRASH_HANDLER:
|
|
||||||
return true;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
throw new UnsupportedOperationException("unexpected type " + spi.getServerOperationType());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class TableQueueKeyComparator implements AvlKeyComparator<TableQueue> {
|
|
||||||
@Override
|
|
||||||
public int compareKey(TableQueue node, Object key) {
|
|
||||||
return node.compareKey((TableName)key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class TableQueue extends Queue<TableName> {
|
|
||||||
private final LockStatus namespaceLockStatus;
|
|
||||||
|
|
||||||
public TableQueue(TableName tableName, int priority, LockStatus tableLock,
|
|
||||||
LockStatus namespaceLockStatus) {
|
|
||||||
super(tableName, priority, tableLock);
|
|
||||||
this.namespaceLockStatus = namespaceLockStatus;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isAvailable() {
|
|
||||||
// if there are no items in the queue, or the namespace is locked.
|
|
||||||
// we can't execute operation on this table
|
|
||||||
if (isEmpty() || namespaceLockStatus.hasExclusiveLock()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (getLockStatus().hasExclusiveLock()) {
|
|
||||||
// if we have an exclusive lock already taken
|
|
||||||
// only child of the lock owner can be executed
|
|
||||||
final Procedure nextProc = peek();
|
|
||||||
return nextProc != null && getLockStatus().hasLockAccess(nextProc);
|
|
||||||
}
|
|
||||||
|
|
||||||
// no xlock
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean requireExclusiveLock(Procedure proc) {
|
|
||||||
return requireTableExclusiveLock((TableProcedureInterface)proc);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class PeerQueueKeyComparator implements AvlKeyComparator<PeerQueue> {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int compareKey(PeerQueue node, Object key) {
|
|
||||||
return node.compareKey((String) key);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class PeerQueue extends Queue<String> {
|
|
||||||
|
|
||||||
public PeerQueue(String peerId, LockStatus lockStatus) {
|
|
||||||
super(peerId, lockStatus);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean requireExclusiveLock(Procedure proc) {
|
|
||||||
return requirePeerExclusiveLock((PeerProcedureInterface) proc);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isAvailable() {
|
|
||||||
if (isEmpty()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (getLockStatus().hasExclusiveLock()) {
|
|
||||||
// if we have an exclusive lock already taken
|
|
||||||
// only child of the lock owner can be executed
|
|
||||||
Procedure nextProc = peek();
|
|
||||||
return nextProc != null && getLockStatus().hasLockAccess(nextProc);
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
// Table Locking Helpers
|
// Table Locking Helpers
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
/**
|
|
||||||
* @param proc must not be null
|
|
||||||
*/
|
|
||||||
private static boolean requireTableExclusiveLock(TableProcedureInterface proc) {
|
|
||||||
switch (proc.getTableOperationType()) {
|
|
||||||
case CREATE:
|
|
||||||
case DELETE:
|
|
||||||
case DISABLE:
|
|
||||||
case ENABLE:
|
|
||||||
return true;
|
|
||||||
case EDIT:
|
|
||||||
// we allow concurrent edit on the NS table
|
|
||||||
return !proc.getTableName().equals(TableName.NAMESPACE_TABLE_NAME);
|
|
||||||
case READ:
|
|
||||||
return false;
|
|
||||||
// region operations are using the shared-lock on the table
|
|
||||||
// and then they will grab an xlock on the region.
|
|
||||||
case REGION_SPLIT:
|
|
||||||
case REGION_MERGE:
|
|
||||||
case REGION_ASSIGN:
|
|
||||||
case REGION_UNASSIGN:
|
|
||||||
case REGION_EDIT:
|
|
||||||
case REGION_GC:
|
|
||||||
case MERGED_REGIONS_GC:
|
|
||||||
return false;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
throw new UnsupportedOperationException("unexpected type " +
|
|
||||||
proc.getTableOperationType());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get lock info for a resource of specified type and name and log details
|
* Get lock info for a resource of specified type and name and log details
|
||||||
*/
|
*/
|
||||||
protected void logLockedResource(LockedResourceType resourceType, String resourceName) {
|
private void logLockedResource(LockedResourceType resourceType, String resourceName) {
|
||||||
if (!LOG.isDebugEnabled()) {
|
if (!LOG.isDebugEnabled()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -765,7 +518,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
return waitTableQueueSharedLock(procedure, table) == null;
|
return waitTableQueueSharedLock(procedure, table) == null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private TableQueue waitTableQueueSharedLock(final Procedure procedure, final TableName table) {
|
private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) {
|
||||||
schedLock();
|
schedLock();
|
||||||
try {
|
try {
|
||||||
final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
|
final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
|
||||||
|
@ -821,7 +574,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
* other new operations pending for that table (e.g. a new create).
|
* other new operations pending for that table (e.g. a new create).
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected boolean markTableAsDeleted(final TableName table, final Procedure procedure) {
|
boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) {
|
||||||
schedLock();
|
schedLock();
|
||||||
try {
|
try {
|
||||||
final TableQueue queue = getTableQueue(table);
|
final TableQueue queue = getTableQueue(table);
|
||||||
|
@ -1067,11 +820,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
// Peer Locking Helpers
|
// Peer Locking Helpers
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
|
|
||||||
private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) {
|
|
||||||
return proc.getPeerOperationType() != PeerOperationType.REFRESH;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Try to acquire the exclusive lock on the specified peer.
|
* Try to acquire the exclusive lock on the specified peer.
|
||||||
* @see #wakePeerExclusiveLock(Procedure, String)
|
* @see #wakePeerExclusiveLock(Procedure, String)
|
||||||
|
@ -1114,278 +862,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ============================================================================
|
|
||||||
// Generic Helpers
|
|
||||||
// ============================================================================
|
|
||||||
private static abstract class Queue<TKey extends Comparable<TKey>>
|
|
||||||
extends AvlLinkedNode<Queue<TKey>> {
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param proc must not be null
|
|
||||||
*/
|
|
||||||
abstract boolean requireExclusiveLock(Procedure proc);
|
|
||||||
|
|
||||||
private final TKey key;
|
|
||||||
private final int priority;
|
|
||||||
private final ProcedureDeque runnables = new ProcedureDeque();
|
|
||||||
// Reference to status of lock on entity this queue represents.
|
|
||||||
private final LockStatus lockStatus;
|
|
||||||
|
|
||||||
public Queue(TKey key, LockStatus lockStatus) {
|
|
||||||
this(key, 1, lockStatus);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Queue(TKey key, int priority, LockStatus lockStatus) {
|
|
||||||
this.key = key;
|
|
||||||
this.priority = priority;
|
|
||||||
this.lockStatus = lockStatus;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected TKey getKey() {
|
|
||||||
return key;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected int getPriority() {
|
|
||||||
return priority;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected LockStatus getLockStatus() {
|
|
||||||
return lockStatus;
|
|
||||||
}
|
|
||||||
|
|
||||||
// This should go away when we have the new AM and its events
|
|
||||||
// and we move xlock to the lock-event-queue.
|
|
||||||
public boolean isAvailable() {
|
|
||||||
return !lockStatus.hasExclusiveLock() && !isEmpty();
|
|
||||||
}
|
|
||||||
|
|
||||||
// ======================================================================
|
|
||||||
// Functions to handle procedure queue
|
|
||||||
// ======================================================================
|
|
||||||
public void add(final Procedure proc, final boolean addToFront) {
|
|
||||||
if (addToFront) {
|
|
||||||
runnables.addFirst(proc);
|
|
||||||
} else {
|
|
||||||
runnables.addLast(proc);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public Procedure peek() {
|
|
||||||
return runnables.peek();
|
|
||||||
}
|
|
||||||
|
|
||||||
public Procedure poll() {
|
|
||||||
return runnables.poll();
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isEmpty() {
|
|
||||||
return runnables.isEmpty();
|
|
||||||
}
|
|
||||||
|
|
||||||
public int size() {
|
|
||||||
return runnables.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
// ======================================================================
|
|
||||||
// Generic Helpers
|
|
||||||
// ======================================================================
|
|
||||||
public int compareKey(TKey cmpKey) {
|
|
||||||
return key.compareTo(cmpKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int compareTo(Queue<TKey> other) {
|
|
||||||
return compareKey(other.key);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return String.format("%s(%s, xlock=%s sharedLock=%s size=%s)",
|
|
||||||
getClass().getSimpleName(), key,
|
|
||||||
lockStatus.hasExclusiveLock() ?
|
|
||||||
"true (" + lockStatus.getExclusiveLockProcIdOwner() + ")" : "false",
|
|
||||||
lockStatus.getSharedLockCount(), size());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Locks on namespaces, tables, and regions.
|
|
||||||
* Since LockAndQueue implementation is NOT thread-safe, schedLock() guards all calls to these
|
|
||||||
* locks.
|
|
||||||
*/
|
|
||||||
private static class SchemaLocking {
|
|
||||||
final Map<ServerName, LockAndQueue> serverLocks = new HashMap<>();
|
|
||||||
final Map<String, LockAndQueue> namespaceLocks = new HashMap<>();
|
|
||||||
final Map<TableName, LockAndQueue> tableLocks = new HashMap<>();
|
|
||||||
// Single map for all regions irrespective of tables. Key is encoded region name.
|
|
||||||
final Map<String, LockAndQueue> regionLocks = new HashMap<>();
|
|
||||||
final Map<String, LockAndQueue> peerLocks = new HashMap<>();
|
|
||||||
|
|
||||||
private <T> LockAndQueue getLock(Map<T, LockAndQueue> map, T key) {
|
|
||||||
LockAndQueue lock = map.get(key);
|
|
||||||
if (lock == null) {
|
|
||||||
lock = new LockAndQueue();
|
|
||||||
map.put(key, lock);
|
|
||||||
}
|
|
||||||
return lock;
|
|
||||||
}
|
|
||||||
|
|
||||||
LockAndQueue getTableLock(TableName tableName) {
|
|
||||||
return getLock(tableLocks, tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
LockAndQueue removeTableLock(TableName tableName) {
|
|
||||||
return tableLocks.remove(tableName);
|
|
||||||
}
|
|
||||||
|
|
||||||
LockAndQueue getNamespaceLock(String namespace) {
|
|
||||||
return getLock(namespaceLocks, namespace);
|
|
||||||
}
|
|
||||||
|
|
||||||
LockAndQueue getRegionLock(String encodedRegionName) {
|
|
||||||
return getLock(regionLocks, encodedRegionName);
|
|
||||||
}
|
|
||||||
|
|
||||||
LockAndQueue removeRegionLock(String encodedRegionName) {
|
|
||||||
return regionLocks.remove(encodedRegionName);
|
|
||||||
}
|
|
||||||
|
|
||||||
LockAndQueue getServerLock(ServerName serverName) {
|
|
||||||
return getLock(serverLocks, serverName);
|
|
||||||
}
|
|
||||||
|
|
||||||
LockAndQueue getPeerLock(String peerId) {
|
|
||||||
return getLock(peerLocks, peerId);
|
|
||||||
}
|
|
||||||
|
|
||||||
LockAndQueue removePeerLock(String peerId) {
|
|
||||||
return peerLocks.remove(peerId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Removes all locks by clearing the maps.
|
|
||||||
* Used when procedure executor is stopped for failure and recovery testing.
|
|
||||||
*/
|
|
||||||
@VisibleForTesting
|
|
||||||
void clear() {
|
|
||||||
serverLocks.clear();
|
|
||||||
namespaceLocks.clear();
|
|
||||||
tableLocks.clear();
|
|
||||||
regionLocks.clear();
|
|
||||||
peerLocks.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "serverLocks=" + filterUnlocked(this.serverLocks) +
|
|
||||||
", namespaceLocks=" + filterUnlocked(this.namespaceLocks) +
|
|
||||||
", tableLocks=" + filterUnlocked(this.tableLocks) +
|
|
||||||
", regionLocks=" + filterUnlocked(this.regionLocks) +
|
|
||||||
", peerLocks=" + filterUnlocked(this.peerLocks);
|
|
||||||
}
|
|
||||||
|
|
||||||
private String filterUnlocked(Map<?, LockAndQueue> locks) {
|
|
||||||
StringBuilder sb = new StringBuilder("{");
|
|
||||||
int initialLength = sb.length();
|
|
||||||
for (Map.Entry<?, LockAndQueue> entry: locks.entrySet()) {
|
|
||||||
if (!entry.getValue().isLocked()) continue;
|
|
||||||
if (sb.length() > initialLength) sb.append(", ");
|
|
||||||
sb.append("{");
|
|
||||||
sb.append(entry.getKey());
|
|
||||||
sb.append("=");
|
|
||||||
sb.append(entry.getValue());
|
|
||||||
sb.append("}");
|
|
||||||
}
|
|
||||||
sb.append("}");
|
|
||||||
return sb.toString();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ======================================================================
|
|
||||||
// Helper Data Structures
|
|
||||||
// ======================================================================
|
|
||||||
|
|
||||||
private static class FairQueue<T extends Comparable<T>> {
|
|
||||||
private final int quantum;
|
|
||||||
|
|
||||||
private Queue<T> currentQueue = null;
|
|
||||||
private Queue<T> queueHead = null;
|
|
||||||
private int currentQuantum = 0;
|
|
||||||
private int size = 0;
|
|
||||||
|
|
||||||
public FairQueue() {
|
|
||||||
this(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
public FairQueue(int quantum) {
|
|
||||||
this.quantum = quantum;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean hasRunnables() {
|
|
||||||
return size > 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void add(Queue<T> queue) {
|
|
||||||
queueHead = AvlIterableList.append(queueHead, queue);
|
|
||||||
if (currentQueue == null) setNextQueue(queueHead);
|
|
||||||
size++;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void remove(Queue<T> queue) {
|
|
||||||
Queue<T> nextQueue = AvlIterableList.readNext(queue);
|
|
||||||
queueHead = AvlIterableList.remove(queueHead, queue);
|
|
||||||
if (currentQueue == queue) {
|
|
||||||
setNextQueue(queueHead != null ? nextQueue : null);
|
|
||||||
}
|
|
||||||
size--;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Queue<T> poll() {
|
|
||||||
if (currentQuantum == 0) {
|
|
||||||
if (!nextQueue()) {
|
|
||||||
return null; // nothing here
|
|
||||||
}
|
|
||||||
currentQuantum = calculateQuantum(currentQueue) - 1;
|
|
||||||
} else {
|
|
||||||
currentQuantum--;
|
|
||||||
}
|
|
||||||
|
|
||||||
// This should go away when we have the new AM and its events
|
|
||||||
if (!currentQueue.isAvailable()) {
|
|
||||||
Queue<T> lastQueue = currentQueue;
|
|
||||||
do {
|
|
||||||
if (!nextQueue())
|
|
||||||
return null;
|
|
||||||
} while (currentQueue != lastQueue && !currentQueue.isAvailable());
|
|
||||||
|
|
||||||
currentQuantum = calculateQuantum(currentQueue) - 1;
|
|
||||||
}
|
|
||||||
return currentQueue;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean nextQueue() {
|
|
||||||
if (currentQueue == null) return false;
|
|
||||||
currentQueue = AvlIterableList.readNext(currentQueue);
|
|
||||||
return currentQueue != null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setNextQueue(Queue<T> queue) {
|
|
||||||
currentQueue = queue;
|
|
||||||
if (queue != null) {
|
|
||||||
currentQuantum = calculateQuantum(currentQueue);
|
|
||||||
} else {
|
|
||||||
currentQuantum = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private int calculateQuantum(final Queue queue) {
|
|
||||||
return Math.max(1, queue.getPriority() * quantum); // TODO
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For debugging. Expensive.
|
* For debugging. Expensive.
|
||||||
* @throws IOException
|
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public String dumpLocks() throws IOException {
|
public String dumpLocks() throws IOException {
|
||||||
|
|
|
@ -15,28 +15,25 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.master.procedure;
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
import org.apache.hadoop.hbase.master.MasterServices;
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
import org.apache.hadoop.hbase.procedure2.Procedure;
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
|
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.util.NonceKey;
|
import org.apache.hadoop.hbase.util.NonceKey;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public final class MasterProcedureUtil {
|
public final class MasterProcedureUtil {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureUtil.class);
|
|
||||||
|
|
||||||
private MasterProcedureUtil() {}
|
private MasterProcedureUtil() {}
|
||||||
|
|
||||||
|
@ -102,7 +99,7 @@ public final class MasterProcedureUtil {
|
||||||
protected abstract void run() throws IOException;
|
protected abstract void run() throws IOException;
|
||||||
protected abstract String getDescription();
|
protected abstract String getDescription();
|
||||||
|
|
||||||
protected long submitProcedure(final Procedure proc) {
|
protected long submitProcedure(final Procedure<?> proc) {
|
||||||
assert procId == null : "submitProcedure() was already called, running procId=" + procId;
|
assert procId == null : "submitProcedure() was already called, running procId=" + procId;
|
||||||
procId = getProcedureExecutor().submitProcedure(proc, nonceKey);
|
procId = getProcedureExecutor().submitProcedure(proc, nonceKey);
|
||||||
return procId;
|
return procId;
|
||||||
|
@ -157,4 +154,27 @@ public final class MasterProcedureUtil {
|
||||||
public static boolean validateProcedureWALFilename(String filename) {
|
public static boolean validateProcedureWALFilename(String filename) {
|
||||||
return pattern.matcher(filename).matches();
|
return pattern.matcher(filename).matches();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the priority for the given table. Now meta table is 3, other system tables are 2, and
|
||||||
|
* user tables are 1.
|
||||||
|
*/
|
||||||
|
public static int getTablePriority(TableName tableName) {
|
||||||
|
if (TableName.isMetaTableName(tableName)) {
|
||||||
|
return 3;
|
||||||
|
} else if (tableName.isSystemTable()) {
|
||||||
|
return 2;
|
||||||
|
} else {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the total levels of table priority. Now we have 3 levels, for meta table, other system
|
||||||
|
* tables and user tables. Notice that the actual value of priority should be decreased from this
|
||||||
|
* value down to 1.
|
||||||
|
*/
|
||||||
|
public static int getTablePriorityLevels() {
|
||||||
|
return 3;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,54 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.LockStatus;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class PeerQueue extends Queue<String> {
|
||||||
|
|
||||||
|
public PeerQueue(String peerId, LockStatus lockStatus) {
|
||||||
|
super(peerId, lockStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAvailable() {
|
||||||
|
if (isEmpty()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (getLockStatus().hasExclusiveLock()) {
|
||||||
|
// if we have an exclusive lock already taken
|
||||||
|
// only child of the lock owner can be executed
|
||||||
|
Procedure<?> nextProc = peek();
|
||||||
|
return nextProc != null && getLockStatus().hasLockAccess(nextProc);
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean requireExclusiveLock(Procedure<?> proc) {
|
||||||
|
return requirePeerExclusiveLock((PeerProcedureInterface) proc);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) {
|
||||||
|
return proc.getPeerOperationType() != PeerOperationType.REFRESH;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,115 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.procedure2.LockStatus;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureDeque;
|
||||||
|
import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
abstract class Queue<TKey extends Comparable<TKey>> extends AvlLinkedNode<Queue<TKey>> {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param proc must not be null
|
||||||
|
*/
|
||||||
|
abstract boolean requireExclusiveLock(Procedure<?> proc);
|
||||||
|
|
||||||
|
private final TKey key;
|
||||||
|
private final int priority;
|
||||||
|
private final ProcedureDeque runnables = new ProcedureDeque();
|
||||||
|
// Reference to status of lock on entity this queue represents.
|
||||||
|
private final LockStatus lockStatus;
|
||||||
|
|
||||||
|
protected Queue(TKey key, LockStatus lockStatus) {
|
||||||
|
this(key, 1, lockStatus);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Queue(TKey key, int priority, LockStatus lockStatus) {
|
||||||
|
assert priority >= 1 : "priority must be greater than or equal to 1";
|
||||||
|
this.key = key;
|
||||||
|
this.priority = priority;
|
||||||
|
this.lockStatus = lockStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected TKey getKey() {
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getPriority() {
|
||||||
|
return priority;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected LockStatus getLockStatus() {
|
||||||
|
return lockStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
// This should go away when we have the new AM and its events
|
||||||
|
// and we move xlock to the lock-event-queue.
|
||||||
|
public boolean isAvailable() {
|
||||||
|
return !lockStatus.hasExclusiveLock() && !isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ======================================================================
|
||||||
|
// Functions to handle procedure queue
|
||||||
|
// ======================================================================
|
||||||
|
public void add(Procedure<?> proc, boolean addToFront) {
|
||||||
|
if (addToFront) {
|
||||||
|
runnables.addFirst(proc);
|
||||||
|
} else {
|
||||||
|
runnables.addLast(proc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Procedure<?> peek() {
|
||||||
|
return runnables.peek();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Procedure<?> poll() {
|
||||||
|
return runnables.poll();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isEmpty() {
|
||||||
|
return runnables.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int size() {
|
||||||
|
return runnables.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ======================================================================
|
||||||
|
// Generic Helpers
|
||||||
|
// ======================================================================
|
||||||
|
public int compareKey(TKey cmpKey) {
|
||||||
|
return key.compareTo(cmpKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(Queue<TKey> other) {
|
||||||
|
return compareKey(other.key);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return String.format("%s(%s, xlock=%s sharedLock=%s size=%s)", getClass().getSimpleName(), key,
|
||||||
|
lockStatus.hasExclusiveLock() ? "true (" + lockStatus.getExclusiveLockProcIdOwner() + ")"
|
||||||
|
: "false",
|
||||||
|
lockStatus.getSharedLockCount(), size());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,214 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.master.locking.LockProcedure;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.LockAndQueue;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.LockType;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.LockedResource;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.LockedResourceType;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Locks on namespaces, tables, and regions.
|
||||||
|
* </p>
|
||||||
|
* <p>
|
||||||
|
* Since LockAndQueue implementation is NOT thread-safe, schedLock() guards all calls to these
|
||||||
|
* locks.
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class SchemaLocking {
|
||||||
|
private final Map<ServerName, LockAndQueue> serverLocks = new HashMap<>();
|
||||||
|
private final Map<String, LockAndQueue> namespaceLocks = new HashMap<>();
|
||||||
|
private final Map<TableName, LockAndQueue> tableLocks = new HashMap<>();
|
||||||
|
// Single map for all regions irrespective of tables. Key is encoded region name.
|
||||||
|
private final Map<String, LockAndQueue> regionLocks = new HashMap<>();
|
||||||
|
private final Map<String, LockAndQueue> peerLocks = new HashMap<>();
|
||||||
|
|
||||||
|
private <T> LockAndQueue getLock(Map<T, LockAndQueue> map, T key) {
|
||||||
|
LockAndQueue lock = map.get(key);
|
||||||
|
if (lock == null) {
|
||||||
|
lock = new LockAndQueue();
|
||||||
|
map.put(key, lock);
|
||||||
|
}
|
||||||
|
return lock;
|
||||||
|
}
|
||||||
|
|
||||||
|
LockAndQueue getTableLock(TableName tableName) {
|
||||||
|
return getLock(tableLocks, tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
LockAndQueue removeTableLock(TableName tableName) {
|
||||||
|
return tableLocks.remove(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
LockAndQueue getNamespaceLock(String namespace) {
|
||||||
|
return getLock(namespaceLocks, namespace);
|
||||||
|
}
|
||||||
|
|
||||||
|
LockAndQueue getRegionLock(String encodedRegionName) {
|
||||||
|
return getLock(regionLocks, encodedRegionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
LockAndQueue removeRegionLock(String encodedRegionName) {
|
||||||
|
return regionLocks.remove(encodedRegionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
LockAndQueue getServerLock(ServerName serverName) {
|
||||||
|
return getLock(serverLocks, serverName);
|
||||||
|
}
|
||||||
|
|
||||||
|
LockAndQueue getPeerLock(String peerId) {
|
||||||
|
return getLock(peerLocks, peerId);
|
||||||
|
}
|
||||||
|
|
||||||
|
LockAndQueue removePeerLock(String peerId) {
|
||||||
|
return peerLocks.remove(peerId);
|
||||||
|
}
|
||||||
|
|
||||||
|
private LockedResource createLockedResource(LockedResourceType resourceType, String resourceName,
|
||||||
|
LockAndQueue queue) {
|
||||||
|
LockType lockType;
|
||||||
|
Procedure<?> exclusiveLockOwnerProcedure;
|
||||||
|
int sharedLockCount;
|
||||||
|
|
||||||
|
if (queue.hasExclusiveLock()) {
|
||||||
|
lockType = LockType.EXCLUSIVE;
|
||||||
|
exclusiveLockOwnerProcedure = queue.getExclusiveLockOwnerProcedure();
|
||||||
|
sharedLockCount = 0;
|
||||||
|
} else {
|
||||||
|
lockType = LockType.SHARED;
|
||||||
|
exclusiveLockOwnerProcedure = null;
|
||||||
|
sharedLockCount = queue.getSharedLockCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Procedure<?>> waitingProcedures = new ArrayList<>();
|
||||||
|
|
||||||
|
for (Procedure<?> procedure : queue) {
|
||||||
|
if (!(procedure instanceof LockProcedure)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
waitingProcedures.add(procedure);
|
||||||
|
}
|
||||||
|
|
||||||
|
return new LockedResource(resourceType, resourceName, lockType, exclusiveLockOwnerProcedure,
|
||||||
|
sharedLockCount, waitingProcedures);
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T> void addToLockedResources(List<LockedResource> lockedResources,
|
||||||
|
Map<T, LockAndQueue> locks, Function<T, String> keyTransformer,
|
||||||
|
LockedResourceType resourcesType) {
|
||||||
|
locks.entrySet().stream().filter(e -> e.getValue().isLocked())
|
||||||
|
.map(e -> createLockedResource(resourcesType, keyTransformer.apply(e.getKey()), e.getValue()))
|
||||||
|
.forEachOrdered(lockedResources::add);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* List lock queues.
|
||||||
|
* @return the locks
|
||||||
|
*/
|
||||||
|
List<LockedResource> getLocks() {
|
||||||
|
List<LockedResource> lockedResources = new ArrayList<>();
|
||||||
|
addToLockedResources(lockedResources, serverLocks, sn -> sn.getServerName(),
|
||||||
|
LockedResourceType.SERVER);
|
||||||
|
addToLockedResources(lockedResources, namespaceLocks, Function.identity(),
|
||||||
|
LockedResourceType.NAMESPACE);
|
||||||
|
addToLockedResources(lockedResources, tableLocks, tn -> tn.getNameAsString(),
|
||||||
|
LockedResourceType.TABLE);
|
||||||
|
addToLockedResources(lockedResources, regionLocks, Function.identity(),
|
||||||
|
LockedResourceType.REGION);
|
||||||
|
addToLockedResources(lockedResources, peerLocks, Function.identity(), LockedResourceType.PEER);
|
||||||
|
return lockedResources;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return {@link LockedResource} for resource of specified type & name. null if resource is not
|
||||||
|
* locked.
|
||||||
|
*/
|
||||||
|
LockedResource getLockResource(LockedResourceType resourceType, String resourceName) {
|
||||||
|
LockAndQueue queue;
|
||||||
|
switch (resourceType) {
|
||||||
|
case SERVER:
|
||||||
|
queue = serverLocks.get(ServerName.valueOf(resourceName));
|
||||||
|
break;
|
||||||
|
case NAMESPACE:
|
||||||
|
queue = namespaceLocks.get(resourceName);
|
||||||
|
break;
|
||||||
|
case TABLE:
|
||||||
|
queue = tableLocks.get(TableName.valueOf(resourceName));
|
||||||
|
break;
|
||||||
|
case REGION:
|
||||||
|
queue = regionLocks.get(resourceName);
|
||||||
|
break;
|
||||||
|
case PEER:
|
||||||
|
queue = peerLocks.get(resourceName);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
queue = null;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
return queue != null ? createLockedResource(resourceType, resourceName, queue) : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes all locks by clearing the maps. Used when procedure executor is stopped for failure and
|
||||||
|
* recovery testing.
|
||||||
|
*/
|
||||||
|
void clear() {
|
||||||
|
serverLocks.clear();
|
||||||
|
namespaceLocks.clear();
|
||||||
|
tableLocks.clear();
|
||||||
|
regionLocks.clear();
|
||||||
|
peerLocks.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "serverLocks=" + filterUnlocked(this.serverLocks) + ", namespaceLocks=" +
|
||||||
|
filterUnlocked(this.namespaceLocks) + ", tableLocks=" + filterUnlocked(this.tableLocks) +
|
||||||
|
", regionLocks=" + filterUnlocked(this.regionLocks) + ", peerLocks=" +
|
||||||
|
filterUnlocked(this.peerLocks);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String filterUnlocked(Map<?, LockAndQueue> locks) {
|
||||||
|
StringBuilder sb = new StringBuilder("{");
|
||||||
|
int initialLength = sb.length();
|
||||||
|
for (Map.Entry<?, LockAndQueue> entry : locks.entrySet()) {
|
||||||
|
if (!entry.getValue().isLocked()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (sb.length() > initialLength) {
|
||||||
|
sb.append(", ");
|
||||||
|
}
|
||||||
|
sb.append("{").append(entry.getKey()).append("=").append(entry.getValue()).append("}");
|
||||||
|
}
|
||||||
|
sb.append("}");
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,43 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.LockStatus;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class ServerQueue extends Queue<ServerName> {
|
||||||
|
|
||||||
|
public ServerQueue(ServerName serverName, LockStatus serverLock) {
|
||||||
|
super(serverName, serverLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean requireExclusiveLock(Procedure<?> proc) {
|
||||||
|
ServerProcedureInterface spi = (ServerProcedureInterface) proc;
|
||||||
|
switch (spi.getServerOperationType()) {
|
||||||
|
case CRASH_HANDLER:
|
||||||
|
return true;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
throw new UnsupportedOperationException("unexpected type " + spi.getServerOperationType());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,89 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.LockStatus;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.Procedure;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class TableQueue extends Queue<TableName> {
|
||||||
|
private final LockStatus namespaceLockStatus;
|
||||||
|
|
||||||
|
public TableQueue(TableName tableName, int priority, LockStatus tableLock,
|
||||||
|
LockStatus namespaceLockStatus) {
|
||||||
|
super(tableName, priority, tableLock);
|
||||||
|
this.namespaceLockStatus = namespaceLockStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isAvailable() {
|
||||||
|
// if there are no items in the queue, or the namespace is locked.
|
||||||
|
// we can't execute operation on this table
|
||||||
|
if (isEmpty() || namespaceLockStatus.hasExclusiveLock()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (getLockStatus().hasExclusiveLock()) {
|
||||||
|
// if we have an exclusive lock already taken
|
||||||
|
// only child of the lock owner can be executed
|
||||||
|
final Procedure<?> nextProc = peek();
|
||||||
|
return nextProc != null && getLockStatus().hasLockAccess(nextProc);
|
||||||
|
}
|
||||||
|
|
||||||
|
// no xlock
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean requireExclusiveLock(Procedure<?> proc) {
|
||||||
|
return requireTableExclusiveLock((TableProcedureInterface) proc);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param proc must not be null
|
||||||
|
*/
|
||||||
|
private static boolean requireTableExclusiveLock(TableProcedureInterface proc) {
|
||||||
|
switch (proc.getTableOperationType()) {
|
||||||
|
case CREATE:
|
||||||
|
case DELETE:
|
||||||
|
case DISABLE:
|
||||||
|
case ENABLE:
|
||||||
|
return true;
|
||||||
|
case EDIT:
|
||||||
|
// we allow concurrent edit on the NS table
|
||||||
|
return !proc.getTableName().equals(TableName.NAMESPACE_TABLE_NAME);
|
||||||
|
case READ:
|
||||||
|
return false;
|
||||||
|
// region operations are using the shared-lock on the table
|
||||||
|
// and then they will grab an xlock on the region.
|
||||||
|
case REGION_SPLIT:
|
||||||
|
case REGION_MERGE:
|
||||||
|
case REGION_ASSIGN:
|
||||||
|
case REGION_UNASSIGN:
|
||||||
|
case REGION_EDIT:
|
||||||
|
case REGION_GC:
|
||||||
|
case MERGED_REGIONS_GC:
|
||||||
|
return false;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
throw new UnsupportedOperationException("unexpected type " + proc.getTableOperationType());
|
||||||
|
}
|
||||||
|
}
|
|
@ -256,7 +256,7 @@ public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBase
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected int doWork() throws Exception {
|
protected int doWork() throws Exception {
|
||||||
procedureScheduler = new MasterProcedureScheduler(UTIL.getConfiguration());
|
procedureScheduler = new MasterProcedureScheduler();
|
||||||
procedureScheduler.start();
|
procedureScheduler.start();
|
||||||
setupOperations();
|
setupOperations();
|
||||||
|
|
||||||
|
|
|
@ -17,16 +17,17 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.master.procedure;
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.Field;
|
import java.lang.reflect.Field;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
@ -60,15 +61,13 @@ public class TestMasterProcedureScheduler {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestMasterProcedureScheduler.class);
|
private static final Logger LOG = LoggerFactory.getLogger(TestMasterProcedureScheduler.class);
|
||||||
|
|
||||||
private MasterProcedureScheduler queue;
|
private MasterProcedureScheduler queue;
|
||||||
private Configuration conf;
|
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public TestName name = new TestName();
|
public TestName name = new TestName();
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws IOException {
|
public void setUp() throws IOException {
|
||||||
conf = HBaseConfiguration.create();
|
queue = new MasterProcedureScheduler();
|
||||||
queue = new MasterProcedureScheduler(conf);
|
|
||||||
queue.start();
|
queue.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -283,26 +282,24 @@ public class TestMasterProcedureScheduler {
|
||||||
// Fetch the 1st item and take the write lock
|
// Fetch the 1st item and take the write lock
|
||||||
Procedure procNs1 = queue.poll();
|
Procedure procNs1 = queue.poll();
|
||||||
assertEquals(1, procNs1.getProcId());
|
assertEquals(1, procNs1.getProcId());
|
||||||
assertEquals(false, queue.waitNamespaceExclusiveLock(procNs1, nsName1));
|
assertFalse(queue.waitNamespaceExclusiveLock(procNs1, nsName1));
|
||||||
|
|
||||||
// System tables have 2 as default priority
|
// namespace table has higher priority so we still return procedure for it
|
||||||
Procedure procNs2 = queue.poll();
|
Procedure procNs2 = queue.poll();
|
||||||
assertEquals(4, procNs2.getProcId());
|
assertEquals(4, procNs2.getProcId());
|
||||||
assertEquals(false, queue.waitNamespaceExclusiveLock(procNs2, nsName2));
|
assertFalse(queue.waitNamespaceExclusiveLock(procNs2, nsName2));
|
||||||
queue.wakeNamespaceExclusiveLock(procNs2, nsName2);
|
queue.wakeNamespaceExclusiveLock(procNs2, nsName2);
|
||||||
|
|
||||||
// add procNs2 back in the queue
|
// add procNs2 back in the queue
|
||||||
queue.yield(procNs2);
|
queue.yield(procNs2);
|
||||||
|
|
||||||
// table on ns1 is locked, so we get table on ns2
|
// again
|
||||||
procNs2 = queue.poll();
|
procNs2 = queue.poll();
|
||||||
assertEquals(3, procNs2.getProcId());
|
assertEquals(4, procNs2.getProcId());
|
||||||
assertEquals(false, queue.waitTableExclusiveLock(procNs2, tableName2));
|
assertFalse(queue.waitNamespaceExclusiveLock(procNs2, nsName2));
|
||||||
|
|
||||||
// ns2 is not available (TODO we may avoid this one)
|
// ns1 and ns2 are both locked so we get nothing
|
||||||
Procedure procNs2b = queue.poll();
|
assertNull(queue.poll());
|
||||||
assertEquals(4, procNs2b.getProcId());
|
|
||||||
assertEquals(true, queue.waitNamespaceExclusiveLock(procNs2b, nsName2));
|
|
||||||
|
|
||||||
// release the ns1 lock
|
// release the ns1 lock
|
||||||
queue.wakeNamespaceExclusiveLock(procNs1, nsName1);
|
queue.wakeNamespaceExclusiveLock(procNs1, nsName1);
|
||||||
|
@ -312,11 +309,11 @@ public class TestMasterProcedureScheduler {
|
||||||
assertEquals(2, procId);
|
assertEquals(2, procId);
|
||||||
|
|
||||||
// release ns2
|
// release ns2
|
||||||
queue.wakeTableExclusiveLock(procNs2, tableName2);
|
queue.wakeNamespaceExclusiveLock(procNs2, nsName2);
|
||||||
|
|
||||||
// we are now able to execute ns2
|
// we are now able to execute table of ns2
|
||||||
procId = queue.poll().getProcId();
|
procId = queue.poll().getProcId();
|
||||||
assertEquals(4, procId);
|
assertEquals(3, procId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -24,9 +24,7 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType;
|
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType;
|
||||||
import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestPeerProcedure;
|
import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestPeerProcedure;
|
||||||
|
@ -53,12 +51,10 @@ public class TestMasterProcedureSchedulerConcurrency {
|
||||||
LoggerFactory.getLogger(TestMasterProcedureSchedulerConcurrency.class);
|
LoggerFactory.getLogger(TestMasterProcedureSchedulerConcurrency.class);
|
||||||
|
|
||||||
private MasterProcedureScheduler queue;
|
private MasterProcedureScheduler queue;
|
||||||
private Configuration conf;
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws IOException {
|
public void setUp() throws IOException {
|
||||||
conf = HBaseConfiguration.create();
|
queue = new MasterProcedureScheduler();
|
||||||
queue = new MasterProcedureScheduler(conf);
|
|
||||||
queue.start();
|
queue.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue