From c18e7a963d9c4dc862c4706f128a4e436111669c Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 15 Feb 2018 13:49:54 +0800 Subject: [PATCH] HBASE-20000 Remove the quantum logic in FairQueue, always put high priority queue in front --- .../org/apache/hadoop/hbase/util/AvlUtil.java | 14 + .../hbase/master/procedure/FairQueue.java | 80 +++ .../master/procedure/MasterProcedureEnv.java | 2 +- .../procedure/MasterProcedureScheduler.java | 560 +----------------- .../master/procedure/MasterProcedureUtil.java | 38 +- .../hbase/master/procedure/PeerQueue.java | 54 ++ .../hadoop/hbase/master/procedure/Queue.java | 115 ++++ .../hbase/master/procedure/SchemaLocking.java | 214 +++++++ .../hbase/master/procedure/ServerQueue.java | 43 ++ .../hbase/master/procedure/TableQueue.java | 89 +++ ...ocedureSchedulerPerformanceEvaluation.java | 2 +- .../TestMasterProcedureScheduler.java | 35 +- ...stMasterProcedureSchedulerConcurrency.java | 6 +- 13 files changed, 676 insertions(+), 576 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AvlUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AvlUtil.java index 782336007f8..6b6eaefb9ae 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AvlUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/AvlUtil.java @@ -548,6 +548,20 @@ public final class AvlUtil { return head; } + /** + * @param head the head of the linked list + * @param base the node which we want to add the {@code node} before it + * @param node the node which we want to add it before the {@code base} node + */ + public static TNode prepend(TNode head, TNode base, TNode node) { + assert !isLinked(node) : node + " is already linked"; + node.iterNext = base; + node.iterPrev = base.iterPrev; + base.iterPrev.iterNext = node; + base.iterPrev = node; + return head == base ? node : head; + } + /** * @param node the node to check * @return true if the node is linked to a list, false otherwise diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java new file mode 100644 index 00000000000..ac8e5775866 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/FairQueue.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class FairQueue> { + + private Queue queueHead = null; + private int size = 0; + + public boolean hasRunnables() { + return size > 0; + } + + public void add(Queue queue) { + // For normal priority queue, just append it to the tail + if (queueHead == null || queue.getPriority() == 1) { + queueHead = AvlIterableList.append(queueHead, queue); + size++; + return; + } + // Find the one which priority is less than us + // For now only TableQueue has priority, and there are only a small number of tables which + // have higher priority so this will not be an expensive operation. + Queue base = queueHead; + do { + if (base.getPriority() < queue.getPriority()) { + queueHead = AvlIterableList.prepend(queueHead, base, queue); + size++; + return; + } + base = AvlIterableList.readNext(base); + } while (base != queueHead); + // no one is lower than us, append to the tail + queueHead = AvlIterableList.append(queueHead, queue); + size++; + } + + public void remove(Queue queue) { + queueHead = AvlIterableList.remove(queueHead, queue); + size--; + } + + public Queue poll() { + if (queueHead == null) { + return null; + } + Queue q = queueHead; + do { + if (q.isAvailable()) { + if (q.getPriority() == 1) { + // for the normal priority queue, remove it and append it to the tail + queueHead = AvlIterableList.remove(queueHead, q); + queueHead = AvlIterableList.append(queueHead, q); + } + return q; + } + q = AvlIterableList.readNext(q); + } while (q != queueHead); + return null; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index ae038a5443e..7fb187fe059 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -106,7 +106,7 @@ public class MasterProcedureEnv implements ConfigurationObserver { public MasterProcedureEnv(final MasterServices master, final RSProcedureDispatcher remoteDispatcher) { this.master = master; - this.procSched = new MasterProcedureScheduler(master.getConfiguration()); + this.procSched = new MasterProcedureScheduler(); this.remoteDispatcher = remoteDispatcher; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 936540db2d3..5cc92982b54 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -19,38 +19,27 @@ package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.function.Function; - -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.master.locking.LockProcedure; -import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType; import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler; import org.apache.hadoop.hbase.procedure2.LockAndQueue; -import org.apache.hadoop.hbase.procedure2.LockStatus; -import org.apache.hadoop.hbase.procedure2.LockType; import org.apache.hadoop.hbase.procedure2.LockedResource; import org.apache.hadoop.hbase.procedure2.LockedResourceType; import org.apache.hadoop.hbase.procedure2.Procedure; -import org.apache.hadoop.hbase.procedure2.ProcedureDeque; import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator; -import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode; import org.apache.hadoop.hbase.util.AvlUtil.AvlTree; import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** @@ -106,12 +95,12 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti public class MasterProcedureScheduler extends AbstractProcedureScheduler { private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class); - private final static ServerQueueKeyComparator SERVER_QUEUE_KEY_COMPARATOR = - new ServerQueueKeyComparator(); - private final static TableQueueKeyComparator TABLE_QUEUE_KEY_COMPARATOR = - new TableQueueKeyComparator(); - private final static PeerQueueKeyComparator PEER_QUEUE_KEY_COMPARATOR = - new PeerQueueKeyComparator(); + private static final AvlKeyComparator SERVER_QUEUE_KEY_COMPARATOR = + (n, k) -> n.compareKey((ServerName) k); + private final static AvlKeyComparator TABLE_QUEUE_KEY_COMPARATOR = + (n, k) -> n.compareKey((TableName) k); + private final static AvlKeyComparator PEER_QUEUE_KEY_COMPARATOR = + (n, k) -> n.compareKey((String) k); private final FairQueue serverRunQueue = new FairQueue<>(); private final FairQueue tableRunQueue = new FairQueue<>(); @@ -123,39 +112,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { private final SchemaLocking locking = new SchemaLocking(); - /** - * Table priority is used when scheduling procedures from {@link #tableRunQueue}. A TableQueue - * with priority 2 will get its procedures scheduled at twice the rate as compared to - * TableQueue with priority 1. This should be enough to ensure system/meta get assigned out - * before user-space tables. HBASE-18109 is where we conclude what is here is good enough. - * Lets open new issue if we find it not enough. - */ - private static class TablePriorities { - final int metaTablePriority; - final int userTablePriority; - final int sysTablePriority; - - TablePriorities(Configuration conf) { - metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3); - sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2); - userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1); - } - - int getPriority(TableName tableName) { - if (tableName.equals(TableName.META_TABLE_NAME)) { - return metaTablePriority; - } else if (tableName.isSystemTable()) { - return sysTablePriority; - } - return userTablePriority; - } - } - private final TablePriorities tablePriorities; - - public MasterProcedureScheduler(final Configuration conf) { - tablePriorities = new TablePriorities(conf); - } - @Override public void yield(final Procedure proc) { push(proc, isTableProcedure(proc), true); @@ -216,13 +172,13 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return pollResult; } - private > Procedure doPoll(final FairQueue fairq) { + private > Procedure doPoll(final FairQueue fairq) { final Queue rq = fairq.poll(); if (rq == null || !rq.isAvailable()) { return null; } - final Procedure pollResult = rq.peek(); + final Procedure pollResult = rq.peek(); if (pollResult == null) { return null; } @@ -240,7 +196,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { // if the rq is in the fairq because of runnable child // check if the next procedure is still a child. // if not, remove the rq from the fairq and go back to the xlock state - Procedure nextProc = rq.peek(); + Procedure nextProc = rq.peek(); if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult)) { removeFromRunQueue(fairq, rq); } @@ -249,61 +205,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return pollResult; } - private LockedResource createLockedResource(LockedResourceType resourceType, - String resourceName, LockAndQueue queue) { - LockType lockType; - Procedure exclusiveLockOwnerProcedure; - int sharedLockCount; - - if (queue.hasExclusiveLock()) { - lockType = LockType.EXCLUSIVE; - exclusiveLockOwnerProcedure = queue.getExclusiveLockOwnerProcedure(); - sharedLockCount = 0; - } else { - lockType = LockType.SHARED; - exclusiveLockOwnerProcedure = null; - sharedLockCount = queue.getSharedLockCount(); - } - - List> waitingProcedures = new ArrayList<>(); - - for (Procedure procedure : queue) { - if (!(procedure instanceof LockProcedure)) { - continue; - } - - waitingProcedures.add(procedure); - } - - return new LockedResource(resourceType, resourceName, lockType, - exclusiveLockOwnerProcedure, sharedLockCount, waitingProcedures); - } - - private void addToLockedResources(List lockedResources, - Map locks, Function keyTransformer, - LockedResourceType resourcesType) { - locks.entrySet().stream().filter(e -> e.getValue().isLocked()) - .map( - e -> createLockedResource(resourcesType, keyTransformer.apply(e.getKey()), e.getValue())) - .forEachOrdered(lockedResources::add); - } - @Override public List getLocks() { schedLock(); try { - List lockedResources = new ArrayList<>(); - addToLockedResources(lockedResources, locking.serverLocks, sn -> sn.getServerName(), - LockedResourceType.SERVER); - addToLockedResources(lockedResources, locking.namespaceLocks, Function.identity(), - LockedResourceType.NAMESPACE); - addToLockedResources(lockedResources, locking.tableLocks, tn -> tn.getNameAsString(), - LockedResourceType.TABLE); - addToLockedResources(lockedResources, locking.regionLocks, Function.identity(), - LockedResourceType.REGION); - addToLockedResources(lockedResources, locking.peerLocks, Function.identity(), - LockedResourceType.PEER); - return lockedResources; + return locking.getLocks(); } finally { schedUnlock(); } @@ -311,27 +217,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { @Override public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) { - LockAndQueue queue = null; schedLock(); try { - switch (resourceType) { - case SERVER: - queue = locking.serverLocks.get(ServerName.valueOf(resourceName)); - break; - case NAMESPACE: - queue = locking.namespaceLocks.get(resourceName); - break; - case TABLE: - queue = locking.tableLocks.get(TableName.valueOf(resourceName)); - break; - case REGION: - queue = locking.regionLocks.get(resourceName); - break; - case PEER: - queue = locking.peerLocks.get(resourceName); - break; - } - return queue != null ? createLockedResource(resourceType, resourceName, queue) : null; + return locking.getLockResource(resourceType, resourceName); } finally { schedUnlock(); } @@ -348,7 +236,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } } - protected void clearQueue() { + private void clearQueue() { // Remove Servers for (int i = 0; i < serverBuckets.length; ++i) { clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR); @@ -450,7 +338,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR); if (node != null) return node; - node = new TableQueue(tableName, tablePriorities.getPriority(tableName), + node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName), locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString())); tableMap = AvlTree.insert(tableMap, node); return node; @@ -512,7 +400,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { locking.removePeerLock(peerId); } - private void tryCleanupPeerQueue(String peerId, Procedure procedure) { + private void tryCleanupPeerQueue(String peerId, Procedure procedure) { schedLock(); try { PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); @@ -538,148 +426,13 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return ((PeerProcedureInterface) proc).getPeerId(); } - // ============================================================================ - // Table and Server Queue Implementation - // ============================================================================ - private static class ServerQueueKeyComparator implements AvlKeyComparator { - @Override - public int compareKey(ServerQueue node, Object key) { - return node.compareKey((ServerName)key); - } - } - - public static class ServerQueue extends Queue { - public ServerQueue(ServerName serverName, LockStatus serverLock) { - super(serverName, serverLock); - } - - @Override - public boolean requireExclusiveLock(Procedure proc) { - ServerProcedureInterface spi = (ServerProcedureInterface)proc; - switch (spi.getServerOperationType()) { - case CRASH_HANDLER: - return true; - default: - break; - } - throw new UnsupportedOperationException("unexpected type " + spi.getServerOperationType()); - } - } - - private static class TableQueueKeyComparator implements AvlKeyComparator { - @Override - public int compareKey(TableQueue node, Object key) { - return node.compareKey((TableName)key); - } - } - - public static class TableQueue extends Queue { - private final LockStatus namespaceLockStatus; - - public TableQueue(TableName tableName, int priority, LockStatus tableLock, - LockStatus namespaceLockStatus) { - super(tableName, priority, tableLock); - this.namespaceLockStatus = namespaceLockStatus; - } - - @Override - public boolean isAvailable() { - // if there are no items in the queue, or the namespace is locked. - // we can't execute operation on this table - if (isEmpty() || namespaceLockStatus.hasExclusiveLock()) { - return false; - } - - if (getLockStatus().hasExclusiveLock()) { - // if we have an exclusive lock already taken - // only child of the lock owner can be executed - final Procedure nextProc = peek(); - return nextProc != null && getLockStatus().hasLockAccess(nextProc); - } - - // no xlock - return true; - } - - @Override - public boolean requireExclusiveLock(Procedure proc) { - return requireTableExclusiveLock((TableProcedureInterface)proc); - } - } - - private static class PeerQueueKeyComparator implements AvlKeyComparator { - - @Override - public int compareKey(PeerQueue node, Object key) { - return node.compareKey((String) key); - } - } - - public static class PeerQueue extends Queue { - - public PeerQueue(String peerId, LockStatus lockStatus) { - super(peerId, lockStatus); - } - - @Override - public boolean requireExclusiveLock(Procedure proc) { - return requirePeerExclusiveLock((PeerProcedureInterface) proc); - } - - @Override - public boolean isAvailable() { - if (isEmpty()) { - return false; - } - if (getLockStatus().hasExclusiveLock()) { - // if we have an exclusive lock already taken - // only child of the lock owner can be executed - Procedure nextProc = peek(); - return nextProc != null && getLockStatus().hasLockAccess(nextProc); - } - return true; - } - } - // ============================================================================ // Table Locking Helpers // ============================================================================ - /** - * @param proc must not be null - */ - private static boolean requireTableExclusiveLock(TableProcedureInterface proc) { - switch (proc.getTableOperationType()) { - case CREATE: - case DELETE: - case DISABLE: - case ENABLE: - return true; - case EDIT: - // we allow concurrent edit on the NS table - return !proc.getTableName().equals(TableName.NAMESPACE_TABLE_NAME); - case READ: - return false; - // region operations are using the shared-lock on the table - // and then they will grab an xlock on the region. - case REGION_SPLIT: - case REGION_MERGE: - case REGION_ASSIGN: - case REGION_UNASSIGN: - case REGION_EDIT: - case REGION_GC: - case MERGED_REGIONS_GC: - return false; - default: - break; - } - throw new UnsupportedOperationException("unexpected type " + - proc.getTableOperationType()); - } - /** * Get lock info for a resource of specified type and name and log details */ - protected void logLockedResource(LockedResourceType resourceType, String resourceName) { + private void logLockedResource(LockedResourceType resourceType, String resourceName) { if (!LOG.isDebugEnabled()) { return; } @@ -765,7 +518,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return waitTableQueueSharedLock(procedure, table) == null; } - private TableQueue waitTableQueueSharedLock(final Procedure procedure, final TableName table) { + private TableQueue waitTableQueueSharedLock(final Procedure procedure, final TableName table) { schedLock(); try { final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString()); @@ -821,7 +574,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * other new operations pending for that table (e.g. a new create). */ @VisibleForTesting - protected boolean markTableAsDeleted(final TableName table, final Procedure procedure) { + boolean markTableAsDeleted(final TableName table, final Procedure procedure) { schedLock(); try { final TableQueue queue = getTableQueue(table); @@ -1067,11 +820,6 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { // ============================================================================ // Peer Locking Helpers // ============================================================================ - - private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) { - return proc.getPeerOperationType() != PeerOperationType.REFRESH; - } - /** * Try to acquire the exclusive lock on the specified peer. * @see #wakePeerExclusiveLock(Procedure, String) @@ -1114,279 +862,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } } - // ============================================================================ - // Generic Helpers - // ============================================================================ - private static abstract class Queue> - extends AvlLinkedNode> { - - /** - * @param proc must not be null - */ - abstract boolean requireExclusiveLock(Procedure proc); - - private final TKey key; - private final int priority; - private final ProcedureDeque runnables = new ProcedureDeque(); - // Reference to status of lock on entity this queue represents. - private final LockStatus lockStatus; - - public Queue(TKey key, LockStatus lockStatus) { - this(key, 1, lockStatus); - } - - public Queue(TKey key, int priority, LockStatus lockStatus) { - this.key = key; - this.priority = priority; - this.lockStatus = lockStatus; - } - - protected TKey getKey() { - return key; - } - - protected int getPriority() { - return priority; - } - - protected LockStatus getLockStatus() { - return lockStatus; - } - - // This should go away when we have the new AM and its events - // and we move xlock to the lock-event-queue. - public boolean isAvailable() { - return !lockStatus.hasExclusiveLock() && !isEmpty(); - } - - // ====================================================================== - // Functions to handle procedure queue - // ====================================================================== - public void add(final Procedure proc, final boolean addToFront) { - if (addToFront) { - runnables.addFirst(proc); - } else { - runnables.addLast(proc); - } - } - - public Procedure peek() { - return runnables.peek(); - } - - public Procedure poll() { - return runnables.poll(); - } - - public boolean isEmpty() { - return runnables.isEmpty(); - } - - public int size() { - return runnables.size(); - } - - // ====================================================================== - // Generic Helpers - // ====================================================================== - public int compareKey(TKey cmpKey) { - return key.compareTo(cmpKey); - } - - @Override - public int compareTo(Queue other) { - return compareKey(other.key); - } - - @Override - public String toString() { - return String.format("%s(%s, xlock=%s sharedLock=%s size=%s)", - getClass().getSimpleName(), key, - lockStatus.hasExclusiveLock() ? - "true (" + lockStatus.getExclusiveLockProcIdOwner() + ")" : "false", - lockStatus.getSharedLockCount(), size()); - } - } - - /** - * Locks on namespaces, tables, and regions. - * Since LockAndQueue implementation is NOT thread-safe, schedLock() guards all calls to these - * locks. - */ - private static class SchemaLocking { - final Map serverLocks = new HashMap<>(); - final Map namespaceLocks = new HashMap<>(); - final Map tableLocks = new HashMap<>(); - // Single map for all regions irrespective of tables. Key is encoded region name. - final Map regionLocks = new HashMap<>(); - final Map peerLocks = new HashMap<>(); - - private LockAndQueue getLock(Map map, T key) { - LockAndQueue lock = map.get(key); - if (lock == null) { - lock = new LockAndQueue(); - map.put(key, lock); - } - return lock; - } - - LockAndQueue getTableLock(TableName tableName) { - return getLock(tableLocks, tableName); - } - - LockAndQueue removeTableLock(TableName tableName) { - return tableLocks.remove(tableName); - } - - LockAndQueue getNamespaceLock(String namespace) { - return getLock(namespaceLocks, namespace); - } - - LockAndQueue getRegionLock(String encodedRegionName) { - return getLock(regionLocks, encodedRegionName); - } - - LockAndQueue removeRegionLock(String encodedRegionName) { - return regionLocks.remove(encodedRegionName); - } - - LockAndQueue getServerLock(ServerName serverName) { - return getLock(serverLocks, serverName); - } - - LockAndQueue getPeerLock(String peerId) { - return getLock(peerLocks, peerId); - } - - LockAndQueue removePeerLock(String peerId) { - return peerLocks.remove(peerId); - } - - /** - * Removes all locks by clearing the maps. - * Used when procedure executor is stopped for failure and recovery testing. - */ - @VisibleForTesting - void clear() { - serverLocks.clear(); - namespaceLocks.clear(); - tableLocks.clear(); - regionLocks.clear(); - peerLocks.clear(); - } - - @Override - public String toString() { - return "serverLocks=" + filterUnlocked(this.serverLocks) + - ", namespaceLocks=" + filterUnlocked(this.namespaceLocks) + - ", tableLocks=" + filterUnlocked(this.tableLocks) + - ", regionLocks=" + filterUnlocked(this.regionLocks) + - ", peerLocks=" + filterUnlocked(this.peerLocks); - } - - private String filterUnlocked(Map locks) { - StringBuilder sb = new StringBuilder("{"); - int initialLength = sb.length(); - for (Map.Entry entry: locks.entrySet()) { - if (!entry.getValue().isLocked()) continue; - if (sb.length() > initialLength) sb.append(", "); - sb.append("{"); - sb.append(entry.getKey()); - sb.append("="); - sb.append(entry.getValue()); - sb.append("}"); - } - sb.append("}"); - return sb.toString(); - } - } - - // ====================================================================== - // Helper Data Structures - // ====================================================================== - - private static class FairQueue> { - private final int quantum; - - private Queue currentQueue = null; - private Queue queueHead = null; - private int currentQuantum = 0; - private int size = 0; - - public FairQueue() { - this(1); - } - - public FairQueue(int quantum) { - this.quantum = quantum; - } - - public boolean hasRunnables() { - return size > 0; - } - - public void add(Queue queue) { - queueHead = AvlIterableList.append(queueHead, queue); - if (currentQueue == null) setNextQueue(queueHead); - size++; - } - - public void remove(Queue queue) { - Queue nextQueue = AvlIterableList.readNext(queue); - queueHead = AvlIterableList.remove(queueHead, queue); - if (currentQueue == queue) { - setNextQueue(queueHead != null ? nextQueue : null); - } - size--; - } - - public Queue poll() { - if (currentQuantum == 0) { - if (!nextQueue()) { - return null; // nothing here - } - currentQuantum = calculateQuantum(currentQueue) - 1; - } else { - currentQuantum--; - } - - // This should go away when we have the new AM and its events - if (!currentQueue.isAvailable()) { - Queue lastQueue = currentQueue; - do { - if (!nextQueue()) - return null; - } while (currentQueue != lastQueue && !currentQueue.isAvailable()); - - currentQuantum = calculateQuantum(currentQueue) - 1; - } - return currentQueue; - } - - private boolean nextQueue() { - if (currentQueue == null) return false; - currentQueue = AvlIterableList.readNext(currentQueue); - return currentQueue != null; - } - - private void setNextQueue(Queue queue) { - currentQueue = queue; - if (queue != null) { - currentQuantum = calculateQuantum(currentQueue); - } else { - currentQuantum = 0; - } - } - - private int calculateQuantum(final Queue queue) { - return Math.max(1, queue.getPriority() * quantum); // TODO - } - } - /** * For debugging. Expensive. - * @throws IOException - */ + */ @VisibleForTesting public String dumpLocks() throws IOException { schedLock(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java index 4afd711cd91..51e24523b3b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureUtil.java @@ -15,28 +15,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.hadoop.hbase.master.procedure; import java.io.IOException; import java.util.regex.Pattern; - -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; @InterfaceAudience.Private @InterfaceStability.Evolving public final class MasterProcedureUtil { - private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureUtil.class); private MasterProcedureUtil() {} @@ -102,7 +99,7 @@ public final class MasterProcedureUtil { protected abstract void run() throws IOException; protected abstract String getDescription(); - protected long submitProcedure(final Procedure proc) { + protected long submitProcedure(final Procedure proc) { assert procId == null : "submitProcedure() was already called, running procId=" + procId; procId = getProcedureExecutor().submitProcedure(proc, nonceKey); return procId; @@ -157,4 +154,27 @@ public final class MasterProcedureUtil { public static boolean validateProcedureWALFilename(String filename) { return pattern.matcher(filename).matches(); } + + /** + * Return the priority for the given table. Now meta table is 3, other system tables are 2, and + * user tables are 1. + */ + public static int getTablePriority(TableName tableName) { + if (TableName.isMetaTableName(tableName)) { + return 3; + } else if (tableName.isSystemTable()) { + return 2; + } else { + return 1; + } + } + + /** + * Return the total levels of table priority. Now we have 3 levels, for meta table, other system + * tables and user tables. Notice that the actual value of priority should be decreased from this + * value down to 1. + */ + public static int getTablePriorityLevels() { + return 3; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java new file mode 100644 index 00000000000..1ae0c2f1177 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerQueue.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType; +import org.apache.hadoop.hbase.procedure2.LockStatus; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +class PeerQueue extends Queue { + + public PeerQueue(String peerId, LockStatus lockStatus) { + super(peerId, lockStatus); + } + + @Override + public boolean isAvailable() { + if (isEmpty()) { + return false; + } + if (getLockStatus().hasExclusiveLock()) { + // if we have an exclusive lock already taken + // only child of the lock owner can be executed + Procedure nextProc = peek(); + return nextProc != null && getLockStatus().hasLockAccess(nextProc); + } + return true; + } + + @Override + public boolean requireExclusiveLock(Procedure proc) { + return requirePeerExclusiveLock((PeerProcedureInterface) proc); + } + + private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) { + return proc.getPeerOperationType() != PeerOperationType.REFRESH; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java new file mode 100644 index 00000000000..f7bea2add27 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/Queue.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.hadoop.hbase.procedure2.LockStatus; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureDeque; +import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +abstract class Queue> extends AvlLinkedNode> { + + /** + * @param proc must not be null + */ + abstract boolean requireExclusiveLock(Procedure proc); + + private final TKey key; + private final int priority; + private final ProcedureDeque runnables = new ProcedureDeque(); + // Reference to status of lock on entity this queue represents. + private final LockStatus lockStatus; + + protected Queue(TKey key, LockStatus lockStatus) { + this(key, 1, lockStatus); + } + + protected Queue(TKey key, int priority, LockStatus lockStatus) { + assert priority >= 1 : "priority must be greater than or equal to 1"; + this.key = key; + this.priority = priority; + this.lockStatus = lockStatus; + } + + protected TKey getKey() { + return key; + } + + public int getPriority() { + return priority; + } + + protected LockStatus getLockStatus() { + return lockStatus; + } + + // This should go away when we have the new AM and its events + // and we move xlock to the lock-event-queue. + public boolean isAvailable() { + return !lockStatus.hasExclusiveLock() && !isEmpty(); + } + + // ====================================================================== + // Functions to handle procedure queue + // ====================================================================== + public void add(Procedure proc, boolean addToFront) { + if (addToFront) { + runnables.addFirst(proc); + } else { + runnables.addLast(proc); + } + } + + public Procedure peek() { + return runnables.peek(); + } + + public Procedure poll() { + return runnables.poll(); + } + + public boolean isEmpty() { + return runnables.isEmpty(); + } + + public int size() { + return runnables.size(); + } + + // ====================================================================== + // Generic Helpers + // ====================================================================== + public int compareKey(TKey cmpKey) { + return key.compareTo(cmpKey); + } + + @Override + public int compareTo(Queue other) { + return compareKey(other.key); + } + + @Override + public String toString() { + return String.format("%s(%s, xlock=%s sharedLock=%s size=%s)", getClass().getSimpleName(), key, + lockStatus.hasExclusiveLock() ? "true (" + lockStatus.getExclusiveLockProcIdOwner() + ")" + : "false", + lockStatus.getSharedLockCount(), size()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java new file mode 100644 index 00000000000..5dcc1218c2b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java @@ -0,0 +1,214 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.locking.LockProcedure; +import org.apache.hadoop.hbase.procedure2.LockAndQueue; +import org.apache.hadoop.hbase.procedure2.LockType; +import org.apache.hadoop.hbase.procedure2.LockedResource; +import org.apache.hadoop.hbase.procedure2.LockedResourceType; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.yetus.audience.InterfaceAudience; + +/** + *

+ * Locks on namespaces, tables, and regions. + *

+ *

+ * Since LockAndQueue implementation is NOT thread-safe, schedLock() guards all calls to these + * locks. + *

+ */ +@InterfaceAudience.Private +class SchemaLocking { + private final Map serverLocks = new HashMap<>(); + private final Map namespaceLocks = new HashMap<>(); + private final Map tableLocks = new HashMap<>(); + // Single map for all regions irrespective of tables. Key is encoded region name. + private final Map regionLocks = new HashMap<>(); + private final Map peerLocks = new HashMap<>(); + + private LockAndQueue getLock(Map map, T key) { + LockAndQueue lock = map.get(key); + if (lock == null) { + lock = new LockAndQueue(); + map.put(key, lock); + } + return lock; + } + + LockAndQueue getTableLock(TableName tableName) { + return getLock(tableLocks, tableName); + } + + LockAndQueue removeTableLock(TableName tableName) { + return tableLocks.remove(tableName); + } + + LockAndQueue getNamespaceLock(String namespace) { + return getLock(namespaceLocks, namespace); + } + + LockAndQueue getRegionLock(String encodedRegionName) { + return getLock(regionLocks, encodedRegionName); + } + + LockAndQueue removeRegionLock(String encodedRegionName) { + return regionLocks.remove(encodedRegionName); + } + + LockAndQueue getServerLock(ServerName serverName) { + return getLock(serverLocks, serverName); + } + + LockAndQueue getPeerLock(String peerId) { + return getLock(peerLocks, peerId); + } + + LockAndQueue removePeerLock(String peerId) { + return peerLocks.remove(peerId); + } + + private LockedResource createLockedResource(LockedResourceType resourceType, String resourceName, + LockAndQueue queue) { + LockType lockType; + Procedure exclusiveLockOwnerProcedure; + int sharedLockCount; + + if (queue.hasExclusiveLock()) { + lockType = LockType.EXCLUSIVE; + exclusiveLockOwnerProcedure = queue.getExclusiveLockOwnerProcedure(); + sharedLockCount = 0; + } else { + lockType = LockType.SHARED; + exclusiveLockOwnerProcedure = null; + sharedLockCount = queue.getSharedLockCount(); + } + + List> waitingProcedures = new ArrayList<>(); + + for (Procedure procedure : queue) { + if (!(procedure instanceof LockProcedure)) { + continue; + } + + waitingProcedures.add(procedure); + } + + return new LockedResource(resourceType, resourceName, lockType, exclusiveLockOwnerProcedure, + sharedLockCount, waitingProcedures); + } + + private void addToLockedResources(List lockedResources, + Map locks, Function keyTransformer, + LockedResourceType resourcesType) { + locks.entrySet().stream().filter(e -> e.getValue().isLocked()) + .map(e -> createLockedResource(resourcesType, keyTransformer.apply(e.getKey()), e.getValue())) + .forEachOrdered(lockedResources::add); + } + + /** + * List lock queues. + * @return the locks + */ + List getLocks() { + List lockedResources = new ArrayList<>(); + addToLockedResources(lockedResources, serverLocks, sn -> sn.getServerName(), + LockedResourceType.SERVER); + addToLockedResources(lockedResources, namespaceLocks, Function.identity(), + LockedResourceType.NAMESPACE); + addToLockedResources(lockedResources, tableLocks, tn -> tn.getNameAsString(), + LockedResourceType.TABLE); + addToLockedResources(lockedResources, regionLocks, Function.identity(), + LockedResourceType.REGION); + addToLockedResources(lockedResources, peerLocks, Function.identity(), LockedResourceType.PEER); + return lockedResources; + } + + /** + * @return {@link LockedResource} for resource of specified type & name. null if resource is not + * locked. + */ + LockedResource getLockResource(LockedResourceType resourceType, String resourceName) { + LockAndQueue queue; + switch (resourceType) { + case SERVER: + queue = serverLocks.get(ServerName.valueOf(resourceName)); + break; + case NAMESPACE: + queue = namespaceLocks.get(resourceName); + break; + case TABLE: + queue = tableLocks.get(TableName.valueOf(resourceName)); + break; + case REGION: + queue = regionLocks.get(resourceName); + break; + case PEER: + queue = peerLocks.get(resourceName); + break; + default: + queue = null; + break; + } + return queue != null ? createLockedResource(resourceType, resourceName, queue) : null; + } + + /** + * Removes all locks by clearing the maps. Used when procedure executor is stopped for failure and + * recovery testing. + */ + void clear() { + serverLocks.clear(); + namespaceLocks.clear(); + tableLocks.clear(); + regionLocks.clear(); + peerLocks.clear(); + } + + @Override + public String toString() { + return "serverLocks=" + filterUnlocked(this.serverLocks) + ", namespaceLocks=" + + filterUnlocked(this.namespaceLocks) + ", tableLocks=" + filterUnlocked(this.tableLocks) + + ", regionLocks=" + filterUnlocked(this.regionLocks) + ", peerLocks=" + + filterUnlocked(this.peerLocks); + } + + private String filterUnlocked(Map locks) { + StringBuilder sb = new StringBuilder("{"); + int initialLength = sb.length(); + for (Map.Entry entry : locks.entrySet()) { + if (!entry.getValue().isLocked()) { + continue; + } + if (sb.length() > initialLength) { + sb.append(", "); + } + sb.append("{").append(entry.getKey()).append("=").append(entry.getValue()).append("}"); + } + sb.append("}"); + return sb.toString(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java new file mode 100644 index 00000000000..5526f3b5d0f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.procedure2.LockStatus; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +class ServerQueue extends Queue { + + public ServerQueue(ServerName serverName, LockStatus serverLock) { + super(serverName, serverLock); + } + + @Override + public boolean requireExclusiveLock(Procedure proc) { + ServerProcedureInterface spi = (ServerProcedureInterface) proc; + switch (spi.getServerOperationType()) { + case CRASH_HANDLER: + return true; + default: + break; + } + throw new UnsupportedOperationException("unexpected type " + spi.getServerOperationType()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java new file mode 100644 index 00000000000..106dfc3ea47 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/TableQueue.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.procedure2.LockStatus; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +class TableQueue extends Queue { + private final LockStatus namespaceLockStatus; + + public TableQueue(TableName tableName, int priority, LockStatus tableLock, + LockStatus namespaceLockStatus) { + super(tableName, priority, tableLock); + this.namespaceLockStatus = namespaceLockStatus; + } + + @Override + public boolean isAvailable() { + // if there are no items in the queue, or the namespace is locked. + // we can't execute operation on this table + if (isEmpty() || namespaceLockStatus.hasExclusiveLock()) { + return false; + } + + if (getLockStatus().hasExclusiveLock()) { + // if we have an exclusive lock already taken + // only child of the lock owner can be executed + final Procedure nextProc = peek(); + return nextProc != null && getLockStatus().hasLockAccess(nextProc); + } + + // no xlock + return true; + } + + @Override + public boolean requireExclusiveLock(Procedure proc) { + return requireTableExclusiveLock((TableProcedureInterface) proc); + } + + /** + * @param proc must not be null + */ + private static boolean requireTableExclusiveLock(TableProcedureInterface proc) { + switch (proc.getTableOperationType()) { + case CREATE: + case DELETE: + case DISABLE: + case ENABLE: + return true; + case EDIT: + // we allow concurrent edit on the NS table + return !proc.getTableName().equals(TableName.NAMESPACE_TABLE_NAME); + case READ: + return false; + // region operations are using the shared-lock on the table + // and then they will grab an xlock on the region. + case REGION_SPLIT: + case REGION_MERGE: + case REGION_ASSIGN: + case REGION_UNASSIGN: + case REGION_EDIT: + case REGION_GC: + case MERGED_REGIONS_GC: + return false; + default: + break; + } + throw new UnsupportedOperationException("unexpected type " + proc.getTableOperationType()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java index e5d3a7944d7..d86d083c332 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureSchedulerPerformanceEvaluation.java @@ -256,7 +256,7 @@ public class MasterProcedureSchedulerPerformanceEvaluation extends AbstractHBase @Override protected int doWork() throws Exception { - procedureScheduler = new MasterProcedureScheduler(UTIL.getConfiguration()); + procedureScheduler = new MasterProcedureScheduler(); procedureScheduler.start(); setupOperations(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java index 05bb637d382..65757db3952 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureScheduler.java @@ -17,16 +17,17 @@ */ package org.apache.hadoop.hbase.master.procedure; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Method; import java.util.Arrays; import java.util.List; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -60,15 +61,13 @@ public class TestMasterProcedureScheduler { private static final Logger LOG = LoggerFactory.getLogger(TestMasterProcedureScheduler.class); private MasterProcedureScheduler queue; - private Configuration conf; @Rule public TestName name = new TestName(); @Before public void setUp() throws IOException { - conf = HBaseConfiguration.create(); - queue = new MasterProcedureScheduler(conf); + queue = new MasterProcedureScheduler(); queue.start(); } @@ -283,26 +282,24 @@ public class TestMasterProcedureScheduler { // Fetch the 1st item and take the write lock Procedure procNs1 = queue.poll(); assertEquals(1, procNs1.getProcId()); - assertEquals(false, queue.waitNamespaceExclusiveLock(procNs1, nsName1)); + assertFalse(queue.waitNamespaceExclusiveLock(procNs1, nsName1)); - // System tables have 2 as default priority + // namespace table has higher priority so we still return procedure for it Procedure procNs2 = queue.poll(); assertEquals(4, procNs2.getProcId()); - assertEquals(false, queue.waitNamespaceExclusiveLock(procNs2, nsName2)); + assertFalse(queue.waitNamespaceExclusiveLock(procNs2, nsName2)); queue.wakeNamespaceExclusiveLock(procNs2, nsName2); // add procNs2 back in the queue queue.yield(procNs2); - // table on ns1 is locked, so we get table on ns2 + // again procNs2 = queue.poll(); - assertEquals(3, procNs2.getProcId()); - assertEquals(false, queue.waitTableExclusiveLock(procNs2, tableName2)); + assertEquals(4, procNs2.getProcId()); + assertFalse(queue.waitNamespaceExclusiveLock(procNs2, nsName2)); - // ns2 is not available (TODO we may avoid this one) - Procedure procNs2b = queue.poll(); - assertEquals(4, procNs2b.getProcId()); - assertEquals(true, queue.waitNamespaceExclusiveLock(procNs2b, nsName2)); + // ns1 and ns2 are both locked so we get nothing + assertNull(queue.poll()); // release the ns1 lock queue.wakeNamespaceExclusiveLock(procNs1, nsName1); @@ -312,11 +309,11 @@ public class TestMasterProcedureScheduler { assertEquals(2, procId); // release ns2 - queue.wakeTableExclusiveLock(procNs2, tableName2); + queue.wakeNamespaceExclusiveLock(procNs2, nsName2); - // we are now able to execute ns2 + // we are now able to execute table of ns2 procId = queue.poll().getProcId(); - assertEquals(4, procId); + assertEquals(3, procId); } @Test diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java index 9c5b602160a..1313cdba853 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureSchedulerConcurrency.java @@ -24,9 +24,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashSet; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType; import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestPeerProcedure; @@ -53,12 +51,10 @@ public class TestMasterProcedureSchedulerConcurrency { LoggerFactory.getLogger(TestMasterProcedureSchedulerConcurrency.class); private MasterProcedureScheduler queue; - private Configuration conf; @Before public void setUp() throws IOException { - conf = HBaseConfiguration.create(); - queue = new MasterProcedureScheduler(conf); + queue = new MasterProcedureScheduler(); queue.start(); }