HBASE-16082 Procedure v2 - Move out helpers from MasterProcedureScheduler

This commit is contained in:
Matteo Bertozzi 2016-08-30 09:44:35 -07:00
parent 0f92e943ac
commit 2acd788dce
3 changed files with 889 additions and 228 deletions

View File

@ -0,0 +1,559 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Helper class that allows to create and manipulate an AvlTree.
* The main utility is in cases where over time we have a lot of add/remove of the same object,
* and we want to avoid all the allocations/deallocations of the "node" objects that the
* java containers will create.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class AvlUtil {
private AvlUtil() {}
/**
* This class represent a node that will be used in an AvlTree.
* Instead of creating another object for the tree node,
* like the TreeMap and the other java contains, here the node can be extended
* and the content can be embedded directly in the node itself.
* This is useful in cases where over time we have a lot of add/remove of the same object.
*/
@InterfaceAudience.Private
public static abstract class AvlNode<TNode extends AvlNode> {
protected TNode avlLeft;
protected TNode avlRight;
protected int avlHeight;
public abstract int compareTo(TNode other);
}
/**
* This class extends the AvlNode and adds two links that will be used in conjunction
* with the AvlIterableList class.
* This is useful in situations where your node must be in a map to have a quick lookup by key,
* but it also require to be in something like a list/queue.
* This is useful in cases where over time we have a lot of add/remove of the same object.
*/
@InterfaceAudience.Private
public static abstract class AvlLinkedNode<TNode extends AvlLinkedNode> extends AvlNode<TNode> {
protected TNode iterNext = null;
protected TNode iterPrev = null;
}
@InterfaceAudience.Private
public interface AvlInsertOrReplace<TNode extends AvlNode> {
TNode insert(Object searchKey);
TNode replace(Object searchKey, TNode prevNode);
}
/**
* The AvlTree allows to lookup an object using a custom key.
* e.g. the java Map allows only to lookup by key using the Comparator
* specified in the constructor.
* In this case you can pass a specific comparator for every needs.
*/
@InterfaceAudience.Private
public static interface AvlKeyComparator<TNode extends AvlNode> {
int compareKey(TNode node, Object key);
}
/**
* Visitor that allows to traverse a set of AvlNodes.
* If you don't like the callback style of the visitor you can always use the AvlTreeIterator.
*/
@InterfaceAudience.Private
public static interface AvlNodeVisitor<TNode extends AvlNode> {
/**
* @param node the node that we are currently visiting
* @return false to stop the iteration. true to continue.
*/
boolean visitNode(TNode node);
}
/**
* Helper class that allows to create and manipulate an AVL Tree
*/
@InterfaceAudience.Private
public static class AvlTree {
/**
* @param root the current root of the tree
* @param key the key for the node we are trying to find
* @param keyComparator the comparator to use to match node and key
* @return the node that matches the specified key or null in case of node not found.
*/
public static <TNode extends AvlNode> TNode get(TNode root, final Object key,
final AvlKeyComparator<TNode> keyComparator) {
while (root != null) {
int cmp = keyComparator.compareKey(root, key);
if (cmp > 0) {
root = (TNode)root.avlLeft;
} else if (cmp < 0) {
root = (TNode)root.avlRight;
} else {
return (TNode)root;
}
}
return null;
}
/**
* @param root the current root of the tree
* @return the first (min) node of the tree
*/
public static <TNode extends AvlNode> TNode getFirst(TNode root) {
if (root != null) {
while (root.avlLeft != null) {
root = (TNode)root.avlLeft;
}
}
return root;
}
/**
* @param root the current root of the tree
* @return the last (max) node of the tree
*/
public static <TNode extends AvlNode> TNode getLast(TNode root) {
if (root != null) {
while (root.avlRight != null) {
root = (TNode)root.avlRight;
}
}
return root;
}
/**
* Insert a node into the tree. It uses the AvlNode.compareTo() for ordering.
* NOTE: The node must not be already in the tree.
* @param root the current root of the tree
* @param node the node to insert
* @return the new root of the tree
*/
public static <TNode extends AvlNode> TNode insert(TNode root, TNode node) {
if (root == null) return node;
int cmp = node.compareTo(root);
assert cmp != 0 : "node already inserted: " + root;
if (cmp < 0) {
root.avlLeft = insert(root.avlLeft, node);
} else {
root.avlRight = insert(root.avlRight, node);
}
return balance(root);
}
/**
* Insert a node into the tree.
* This is useful when you want to create a new node or replace the content
* depending if the node already exists or not.
* Using AvlInsertOrReplace class you can return the node to add/replace.
*
* @param root the current root of the tree
* @param key the key for the node we are trying to insert
* @param keyComparator the comparator to use to match node and key
* @param insertOrReplace the class to use to insert or replace the node
* @return the new root of the tree
*/
public static <TNode extends AvlNode> TNode insert(TNode root, Object key,
final AvlKeyComparator<TNode> keyComparator,
final AvlInsertOrReplace<TNode> insertOrReplace) {
if (root == null) {
return insertOrReplace.insert(key);
}
int cmp = keyComparator.compareKey(root, key);
if (cmp < 0) {
root.avlLeft = insert((TNode)root.avlLeft, key, keyComparator, insertOrReplace);
} else if (cmp > 0) {
root.avlRight = insert((TNode)root.avlRight, key, keyComparator, insertOrReplace);
} else {
TNode left = (TNode)root.avlLeft;
TNode right = (TNode)root.avlRight;
root = insertOrReplace.replace(key, root);
root.avlLeft = left;
root.avlRight = right;
return root;
}
return balance(root);
}
private static <TNode extends AvlNode> TNode removeMin(TNode p) {
if (p.avlLeft == null)
return (TNode)p.avlRight;
p.avlLeft = removeMin(p.avlLeft);
return balance(p);
}
/**
* Removes the node matching the specified key from the tree
* @param root the current root of the tree
* @param key the key for the node we are trying to find
* @param keyComparator the comparator to use to match node and key
* @return the new root of the tree
*/
public static <TNode extends AvlNode> TNode remove(TNode root, Object key,
final AvlKeyComparator<TNode> keyComparator) {
return remove(root, key, keyComparator, null);
}
/**
* Removes the node matching the specified key from the tree
* @param root the current root of the tree
* @param key the key for the node we are trying to find
* @param keyComparator the comparator to use to match node and key
* @param removed will be set to true if the node was found and removed, otherwise false
* @return the new root of the tree
*/
public static <TNode extends AvlNode> TNode remove(TNode root, Object key,
final AvlKeyComparator<TNode> keyComparator, final AtomicBoolean removed) {
if (root == null) return null;
int cmp = keyComparator.compareKey(root, key);
if (cmp == 0) {
if (removed != null) removed.set(true);
TNode q = (TNode)root.avlLeft;
TNode r = (TNode)root.avlRight;
if (r == null) return q;
TNode min = getFirst(r);
min.avlRight = removeMin(r);
min.avlLeft = q;
return balance(min);
} else if (cmp > 0) {
root.avlLeft = remove((TNode)root.avlLeft, key, keyComparator);
} else /* if (cmp < 0) */ {
root.avlRight = remove((TNode)root.avlRight, key, keyComparator);
}
return balance(root);
}
/**
* Visit each node of the tree
* @param root the current root of the tree
* @param visitor the AvlNodeVisitor instance
*/
public static <TNode extends AvlNode> void visit(final TNode root,
final AvlNodeVisitor<TNode> visitor) {
if (root == null) return;
final AvlTreeIterator<TNode> iterator = new AvlTreeIterator<TNode>(root);
boolean visitNext = true;
while (visitNext && iterator.hasNext()) {
visitNext = visitor.visitNode(iterator.next());
}
}
private static <TNode extends AvlNode> TNode balance(TNode p) {
fixHeight(p);
int balance = balanceFactor(p);
if (balance == 2) {
if (balanceFactor(p.avlRight) < 0) {
p.avlRight = rotateRight(p.avlRight);
}
return rotateLeft(p);
} else if (balance == -2) {
if (balanceFactor(p.avlLeft) > 0) {
p.avlLeft = rotateLeft(p.avlLeft);
}
return rotateRight(p);
}
return p;
}
private static <TNode extends AvlNode> TNode rotateRight(TNode p) {
TNode q = (TNode)p.avlLeft;
p.avlLeft = q.avlRight;
q.avlRight = p;
fixHeight(p);
fixHeight(q);
return q;
}
private static <TNode extends AvlNode> TNode rotateLeft(TNode q) {
TNode p = (TNode)q.avlRight;
q.avlRight = p.avlLeft;
p.avlLeft = q;
fixHeight(q);
fixHeight(p);
return p;
}
private static <TNode extends AvlNode> void fixHeight(TNode node) {
final int heightLeft = height(node.avlLeft);
final int heightRight = height(node.avlRight);
node.avlHeight = 1 + Math.max(heightLeft, heightRight);
}
private static <TNode extends AvlNode> int height(TNode node) {
return node != null ? node.avlHeight : 0;
}
private static <TNode extends AvlNode> int balanceFactor(TNode node) {
return height(node.avlRight) - height(node.avlLeft);
}
}
/**
* Iterator for the AvlTree
*/
@InterfaceAudience.Private
public static class AvlTreeIterator<TNode extends AvlNode> implements Iterator<TNode> {
private final Object[] stack = new Object[64];
private TNode current = null;
private int height = 0;
public AvlTreeIterator() {
}
/**
* Create the iterator starting from the first (min) node of the tree
*/
public AvlTreeIterator(final TNode root) {
seekFirst(root);
}
/**
* Create the iterator starting from the specified key
* @param root the current root of the tree
* @param key the key for the node we are trying to find
* @param keyComparator the comparator to use to match node and key
*/
public AvlTreeIterator(final TNode root, final Object key,
final AvlKeyComparator<TNode> keyComparator) {
seekTo(root, key, keyComparator);
}
@Override
public boolean hasNext() {
return current != null;
}
@Override
public TNode next() {
final TNode node = this.current;
seekNext();
return node;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
/**
* Reset the iterator, and seeks to the first (min) node of the tree
* @param root the current root of the tree
*/
public void seekFirst(final TNode root) {
current = root;
height = 0;
if (root != null) {
while (current.avlLeft != null) {
stack[height++] = current;
current = (TNode)current.avlLeft;
}
}
}
/**
* Reset the iterator, and seeks to the specified key
* @param root the current root of the tree
* @param key the key for the node we are trying to find
* @param keyComparator the comparator to use to match node and key
*/
public void seekTo(final TNode root, final Object key,
final AvlKeyComparator<TNode> keyComparator) {
current = null;
height = 0;
TNode node = root;
while (node != null) {
if (keyComparator.compareKey(node, key) >= 0) {
if (node.avlLeft != null) {
stack[height++] = node;
node = (TNode)node.avlLeft;
} else {
current = node;
return;
}
} else {
if (node.avlRight != null) {
stack[height++] = node;
node = (TNode)node.avlRight;
} else {
if (height > 0) {
TNode parent = (TNode)stack[--height];
while (node == parent.avlRight) {
if (height == 0) {
current = null;
return;
}
node = parent;
parent = (TNode)stack[--height];
}
current = parent;
return;
}
current = null;
return;
}
}
}
}
private void seekNext() {
if (current == null) return;
if (current.avlRight != null) {
stack[height++] = current;
current = (TNode)current.avlRight;
while (current.avlLeft != null) {
stack[height++] = current;
current = (TNode)current.avlLeft;
}
} else {
TNode node;
do {
if (height == 0) {
current = null;
return;
}
node = current;
current = (TNode)stack[--height];
} while (current.avlRight == node);
}
}
}
/**
* Helper class that allows to create and manipulate a linked list of AvlLinkedNodes
*/
@InterfaceAudience.Private
public static class AvlIterableList {
/**
* @param node the current node
* @return the successor of the current node
*/
public static <TNode extends AvlLinkedNode> TNode readNext(TNode node) {
return (TNode)node.iterNext;
}
/**
* @param node the current node
* @return the predecessor of the current node
*/
public static <TNode extends AvlLinkedNode> TNode readPrev(TNode node) {
return (TNode)node.iterPrev;
}
/**
* @param head the head of the linked list
* @param node the node to add to the front of the list
* @return the new head of the list
*/
public static <TNode extends AvlLinkedNode> TNode prepend(TNode head, TNode node) {
assert !isLinked(node) : node + " is already linked";
if (head != null) {
TNode tail = (TNode)head.iterPrev;
tail.iterNext = node;
head.iterPrev = node;
node.iterNext = head;
node.iterPrev = tail;
} else {
node.iterNext = node;
node.iterPrev = node;
}
return node;
}
/**
* @param head the head of the linked list
* @param node the node to add to the tail of the list
* @return the new head of the list
*/
public static <TNode extends AvlLinkedNode> TNode append(TNode head, TNode node) {
assert !isLinked(node) : node + " is already linked";
if (head != null) {
TNode tail = (TNode)head.iterPrev;
tail.iterNext = node;
node.iterNext = head;
node.iterPrev = tail;
head.iterPrev = node;
return head;
}
node.iterNext = node;
node.iterPrev = node;
return node;
}
/**
* @param head the head of the current linked list
* @param otherHead the head of the list to append to the current list
* @return the new head of the current list
*/
public static <TNode extends AvlLinkedNode> TNode appendList(TNode head, TNode otherHead) {
if (head == null) return otherHead;
if (otherHead == null) return head;
TNode tail = (TNode)head.iterPrev;
TNode otherTail = (TNode)otherHead.iterPrev;
tail.iterNext = otherHead;
otherHead.iterPrev = tail;
otherTail.iterNext = head;
head.iterPrev = otherTail;
return head;
}
/**
* @param head the head of the linked list
* @param node the node to remove from the list
* @return the new head of the list
*/
public static <TNode extends AvlLinkedNode> TNode remove(TNode head, TNode node) {
assert isLinked(node) : node + " is not linked";
if (node != node.iterNext) {
node.iterPrev.iterNext = node.iterNext;
node.iterNext.iterPrev = node.iterPrev;
head = (head == node) ? (TNode)node.iterNext : head;
} else {
head = null;
}
node.iterNext = null;
node.iterPrev = null;
return head;
}
/**
* @param node the node to check
* @return true if the node is linked to a list, false otherwise
*/
public static <TNode extends AvlLinkedNode> boolean isLinked(TNode node) {
return node.iterPrev != null && node.iterNext != null;
}
}
}

View File

@ -0,0 +1,261 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hadoop.hbase.util;
import java.util.Random;
import java.util.TreeMap;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode;
import org.apache.hadoop.hbase.util.AvlUtil.AvlNode;
import org.apache.hadoop.hbase.util.AvlUtil.AvlNodeVisitor;
import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@Category({MiscTests.class, SmallTests.class})
public class TestAvlUtil {
private static final TestAvlKeyComparator KEY_COMPARATOR = new TestAvlKeyComparator();
@Test
public void testAvlTreeCrud() {
final int MAX_KEY = 99999999;
final int NELEM = 10000;
final TreeMap<Integer, Object> treeMap = new TreeMap<Integer, Object>();
TestAvlNode root = null;
final Random rand = new Random();
for (int i = 0; i < NELEM; ++i) {
int key = rand.nextInt(MAX_KEY);
if (AvlTree.get(root, key, KEY_COMPARATOR) != null) {
i--;
continue;
}
root = AvlTree.insert(root, new TestAvlNode(key));
treeMap.put(key, null);
for (Integer keyX: treeMap.keySet()) {
TestAvlNode node = AvlTree.get(root, keyX, KEY_COMPARATOR);
assertNotNull(node);
assertEquals(keyX.intValue(), node.getKey());
}
}
for (int i = 0; i < NELEM; ++i) {
int key = rand.nextInt(MAX_KEY);
TestAvlNode node = AvlTree.get(root, key, KEY_COMPARATOR);
if (!treeMap.containsKey(key)) {
assert node == null;
continue;
}
treeMap.remove(key);
assertEquals(key, node.getKey());
root = AvlTree.remove(root, key, KEY_COMPARATOR);
for (Integer keyX: treeMap.keySet()) {
node = AvlTree.get(root, keyX, KEY_COMPARATOR);
assertNotNull(node);
assertEquals(keyX.intValue(), node.getKey());
}
}
}
@Test
public void testAvlTreeVisitor() {
final int MIN_KEY = 0;
final int MAX_KEY = 50;
TestAvlNode root = null;
for (int i = MAX_KEY; i >= MIN_KEY; --i) {
root = AvlTree.insert(root, new TestAvlNode(i));
}
AvlTree.visit(root, new AvlNodeVisitor<TestAvlNode>() {
private int prevKey = -1;
public boolean visitNode(TestAvlNode node) {
assertEquals(prevKey, node.getKey() - 1);
assertTrue(node.getKey() >= MIN_KEY);
assertTrue(node.getKey() <= MAX_KEY);
prevKey = node.getKey();
return node.getKey() <= MAX_KEY;
}
});
}
@Test
public void testAvlTreeIterSeekFirst() {
final int MIN_KEY = 1;
final int MAX_KEY = 50;
TestAvlNode root = null;
for (int i = MIN_KEY; i < MAX_KEY; ++i) {
root = AvlTree.insert(root, new TestAvlNode(i));
}
AvlTreeIterator<TestAvlNode> iter = new AvlTreeIterator<TestAvlNode>(root);
assertTrue(iter.hasNext());
long prevKey = 0;
while (iter.hasNext()) {
TestAvlNode node = iter.next();
assertEquals(prevKey + 1, node.getKey());
prevKey = node.getKey();
}
assertEquals(MAX_KEY - 1, prevKey);
}
@Test
public void testAvlTreeIterSeekTo() {
final int MIN_KEY = 1;
final int MAX_KEY = 50;
TestAvlNode root = null;
for (int i = MIN_KEY; i < MAX_KEY; i += 2) {
root = AvlTree.insert(root, new TestAvlNode(i));
}
for (int i = MIN_KEY - 1; i < MAX_KEY + 1; ++i) {
AvlTreeIterator<TestAvlNode> iter = new AvlTreeIterator<TestAvlNode>(root, i, KEY_COMPARATOR);
if (i < MAX_KEY) {
assertTrue(iter.hasNext());
} else {
// searching for something greater than the last node
assertFalse(iter.hasNext());
break;
}
TestAvlNode node = iter.next();
assertEquals((i % 2 == 0) ? i + 1 : i, node.getKey());
long prevKey = node.getKey();
while (iter.hasNext()) {
node = iter.next();
assertTrue(node.getKey() > prevKey);
prevKey = node.getKey();
}
}
}
@Test
public void testAvlIterableListCrud() {
final int NITEMS = 10;
TestLinkedAvlNode prependHead = null;
TestLinkedAvlNode appendHead = null;
// prepend()/append()
for (int i = 0; i <= NITEMS; ++i) {
TestLinkedAvlNode pNode = new TestLinkedAvlNode(i);
assertFalse(AvlIterableList.isLinked(pNode));
prependHead = AvlIterableList.prepend(prependHead, pNode);
assertTrue(AvlIterableList.isLinked(pNode));
TestLinkedAvlNode aNode = new TestLinkedAvlNode(i);
assertFalse(AvlIterableList.isLinked(aNode));
appendHead = AvlIterableList.append(appendHead, aNode);
assertTrue(AvlIterableList.isLinked(aNode));
}
// readNext()
TestLinkedAvlNode pNode = prependHead;
TestLinkedAvlNode aNode = appendHead;
for (int i = 0; i <= NITEMS; ++i) {
assertEquals(NITEMS - i, pNode.getKey());
pNode = AvlIterableList.readNext(pNode);
assertEquals(i, aNode.getKey());
aNode = AvlIterableList.readNext(aNode);
}
// readPrev()
pNode = AvlIterableList.readPrev(prependHead);
aNode = AvlIterableList.readPrev(appendHead);
for (int i = 0; i <= NITEMS; ++i) {
assertEquals(i, pNode.getKey());
pNode = AvlIterableList.readPrev(pNode);
assertEquals(NITEMS - i, aNode.getKey());
aNode = AvlIterableList.readPrev(aNode);
}
// appendList()
TestLinkedAvlNode node = AvlIterableList.appendList(prependHead, appendHead);
for (int i = NITEMS; i >= 0; --i) {
assertEquals(i, node.getKey());
node = AvlIterableList.readNext(node);
}
for (int i = 0; i <= NITEMS; ++i) {
assertEquals(i, node.getKey());
node = AvlIterableList.readNext(node);
}
}
private static class TestAvlNode extends AvlNode<TestAvlNode> {
private final int key;
public TestAvlNode(int key) {
this.key = key;
}
public int getKey() {
return key;
}
@Override
public int compareTo(TestAvlNode other) {
return this.key - other.key;
}
@Override
public String toString() {
return String.format("TestAvlNode(%d)", key);
}
}
private static class TestLinkedAvlNode extends AvlLinkedNode<TestLinkedAvlNode> {
private final int key;
public TestLinkedAvlNode(int key) {
this.key = key;
}
public int getKey() {
return key;
}
@Override
public int compareTo(TestLinkedAvlNode other) {
return this.key - other.key;
}
@Override
public String toString() {
return String.format("TestLinkedAvlNode(%d)", key);
}
}
private static class TestAvlKeyComparator implements AvlKeyComparator<TestAvlNode> {
public int compareKey(TestAvlNode node, Object key) {
return node.getKey() - (int)key;
}
}
}

View File

@ -40,6 +40,10 @@ import org.apache.hadoop.hbase.master.TableLockManager.TableLock;
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet;
import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode;
import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
/**
* ProcedureRunnableSet for the Master Procedures.
@ -65,13 +69,20 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
private final ReentrantLock schedLock = new ReentrantLock();
private final Condition schedWaitCond = schedLock.newCondition();
private final static NamespaceQueueKeyComparator NAMESPACE_QUEUE_KEY_COMPARATOR =
new NamespaceQueueKeyComparator();
private final static ServerQueueKeyComparator SERVER_QUEUE_KEY_COMPARATOR =
new ServerQueueKeyComparator();
private final static TableQueueKeyComparator TABLE_QUEUE_KEY_COMPARATOR =
new TableQueueKeyComparator();
private final FairQueue<ServerName> serverRunQueue = new FairQueue<ServerName>();
private final FairQueue<TableName> tableRunQueue = new FairQueue<TableName>();
private int queueSize = 0;
private final Object[] serverBuckets = new Object[128];
private Queue<String> namespaceMap = null;
private Queue<TableName> tableMap = null;
private final ServerQueue[] serverBuckets = new ServerQueue[128];
private NamespaceQueue namespaceMap = null;
private TableQueue tableMap = null;
private final int metaTablePriority;
private final int userTablePriority;
@ -142,7 +153,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
// the queue is not suspended or removed from the fairq (run-queue)
// because someone has an xlock on it.
// so, if the queue is not-linked we should add it
if (queue.size() == 1 && !IterableList.isLinked(queue)) {
if (queue.size() == 1 && !AvlIterableList.isLinked(queue)) {
fairq.add(queue);
}
queueSize++;
@ -152,7 +163,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
// our (proc) parent has the xlock,
// so the queue is not in the fairq (run-queue)
// add it back to let the child run (inherit the lock)
if (!IterableList.isLinked(queue)) {
if (!AvlIterableList.isLinked(queue)) {
fairq.add(queue);
}
queueSize++;
@ -230,12 +241,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
try {
// Remove Servers
for (int i = 0; i < serverBuckets.length; ++i) {
clear((ServerQueue)serverBuckets[i], serverRunQueue);
clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
serverBuckets[i] = null;
}
// Remove Tables
clear(tableMap, tableRunQueue);
clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
tableMap = null;
assert queueSize == 0 : "expected queue size to be 0, got " + queueSize;
@ -244,11 +255,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
}
private <T extends Comparable<T>> void clear(Queue<T> treeMap, FairQueue<T> fairq) {
private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap,
final FairQueue<T> fairq, final AvlKeyComparator<TNode> comparator) {
while (treeMap != null) {
Queue<T> node = AvlTree.getFirst(treeMap);
assert !node.isSuspended() : "can't clear suspended " + node.getKey();
treeMap = AvlTree.remove(treeMap, node.getKey());
treeMap = AvlTree.remove(treeMap, node.getKey(), comparator);
removeFromRunQueue(fairq, node);
}
}
@ -302,7 +314,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
private <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue) {
if (IterableList.isLinked(queue)) return;
if (AvlIterableList.isLinked(queue)) return;
if (!queue.isEmpty()) {
fairq.add(queue);
queueSize += queue.size();
@ -310,7 +322,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
private <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, Queue<T> queue) {
if (!IterableList.isLinked(queue)) return;
if (!AvlIterableList.isLinked(queue)) return;
fairq.remove(queue);
queueSize -= queue.size();
}
@ -507,11 +519,11 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
private void suspendTableQueue(Queue<TableName> queue) {
waitingTables = IterableList.append(waitingTables, queue);
waitingTables = AvlIterableList.append(waitingTables, queue);
}
private void suspendServerQueue(Queue<ServerName> queue) {
waitingServers = IterableList.append(waitingServers, queue);
waitingServers = AvlIterableList.append(waitingServers, queue);
}
private boolean hasWaitingTables() {
@ -520,7 +532,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
private Queue<TableName> popWaitingTable() {
Queue<TableName> node = waitingTables;
waitingTables = IterableList.remove(waitingTables, node);
waitingTables = AvlIterableList.remove(waitingTables, node);
node.setSuspended(false);
return node;
}
@ -531,7 +543,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
private Queue<ServerName> popWaitingServer() {
Queue<ServerName> node = waitingServers;
waitingServers = IterableList.remove(waitingServers, node);
waitingServers = AvlIterableList.remove(waitingServers, node);
node.setSuspended(false);
return node;
}
@ -555,17 +567,17 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
private TableQueue getTableQueue(TableName tableName) {
Queue<TableName> node = AvlTree.get(tableMap, tableName);
if (node != null) return (TableQueue)node;
TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
if (node != null) return node;
NamespaceQueue nsQueue = getNamespaceQueue(tableName.getNamespaceAsString());
node = new TableQueue(tableName, nsQueue, getTablePriority(tableName));
tableMap = AvlTree.insert(tableMap, node);
return (TableQueue)node;
return node;
}
private void removeTableQueue(TableName tableName) {
tableMap = AvlTree.remove(tableMap, tableName);
tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
}
private int getTablePriority(TableName tableName) {
@ -589,12 +601,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
// Namespace Queue Lookup Helpers
// ============================================================================
private NamespaceQueue getNamespaceQueue(String namespace) {
Queue<String> node = AvlTree.get(namespaceMap, namespace);
NamespaceQueue node = AvlTree.get(namespaceMap, namespace, NAMESPACE_QUEUE_KEY_COMPARATOR);
if (node != null) return (NamespaceQueue)node;
node = new NamespaceQueue(namespace);
namespaceMap = AvlTree.insert(namespaceMap, node);
return (NamespaceQueue)node;
return node;
}
// ============================================================================
@ -610,24 +622,19 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
private ServerQueue getServerQueue(ServerName serverName) {
int index = getBucketIndex(serverBuckets, serverName.hashCode());
Queue<ServerName> root = getTreeRoot(serverBuckets, index);
Queue<ServerName> node = AvlTree.get(root, serverName);
if (node != null) return (ServerQueue)node;
final int index = getBucketIndex(serverBuckets, serverName.hashCode());
ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
if (node != null) return node;
node = new ServerQueue(serverName);
serverBuckets[index] = AvlTree.insert(root, node);
serverBuckets[index] = AvlTree.insert(serverBuckets[index], node);
return (ServerQueue)node;
}
private void removeServerQueue(ServerName serverName) {
int index = getBucketIndex(serverBuckets, serverName.hashCode());
serverBuckets[index] = AvlTree.remove((ServerQueue)serverBuckets[index], serverName);
}
@SuppressWarnings("unchecked")
private static <T extends Comparable<T>> Queue<T> getTreeRoot(Object[] buckets, int index) {
return (Queue<T>) buckets[index];
final int index = getBucketIndex(serverBuckets, serverName.hashCode());
final ServerQueue root = serverBuckets[index];
serverBuckets[index] = AvlTree.remove(root, serverName, SERVER_QUEUE_KEY_COMPARATOR);
}
private static int getBucketIndex(Object[] buckets, int hashCode) {
@ -645,6 +652,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
// ============================================================================
// Table and Server Queue Implementation
// ============================================================================
private static class ServerQueueKeyComparator implements AvlKeyComparator<ServerQueue> {
@Override
public int compareKey(ServerQueue node, Object key) {
return node.compareKey((ServerName)key);
}
}
public static class ServerQueue extends QueueImpl<ServerName> {
public ServerQueue(ServerName serverName) {
super(serverName);
@ -699,6 +713,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
}
private static class TableQueueKeyComparator implements AvlKeyComparator<TableQueue> {
@Override
public int compareKey(TableQueue node, Object key) {
return node.compareKey((TableName)key);
}
}
public static class TableQueue extends QueueImpl<TableName> {
private final NamespaceQueue namespaceQueue;
@ -852,6 +873,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
}
private static class NamespaceQueueKeyComparator implements AvlKeyComparator<NamespaceQueue> {
@Override
public int compareKey(NamespaceQueue node, Object key) {
return node.compareKey((String)key);
}
}
/**
* the namespace is currently used just as a rwlock, not as a queue.
* because ns operation are not frequent enough. so we want to avoid
@ -1024,7 +1052,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (queue.isEmpty() && queue.tryExclusiveLock(0)) {
// remove the table from the run-queue and the map
if (IterableList.isLinked(queue)) {
if (AvlIterableList.isLinked(queue)) {
tableRunQueue.remove(queue);
}
@ -1268,13 +1296,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
boolean isSuspended();
}
private static abstract class Queue<TKey extends Comparable<TKey>> implements QueueInterface {
private Queue<TKey> avlRight = null;
private Queue<TKey> avlLeft = null;
private int avlHeight = 1;
private Queue<TKey> iterNext = null;
private Queue<TKey> iterPrev = null;
private static abstract class Queue<TKey extends Comparable<TKey>>
extends AvlLinkedNode<Queue<TKey>> implements QueueInterface {
private boolean suspended = false;
private long exclusiveLockProcIdOwner = Long.MIN_VALUE;
@ -1366,6 +1389,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
return key.compareTo(cmpKey);
}
@Override
public int compareTo(Queue<TKey> other) {
return compareKey(other.key);
}
@ -1441,13 +1465,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
}
public void add(Queue<T> queue) {
queueHead = IterableList.append(queueHead, queue);
queueHead = AvlIterableList.append(queueHead, queue);
if (currentQueue == null) setNextQueue(queueHead);
}
public void remove(Queue<T> queue) {
Queue<T> nextQueue = queue.iterNext;
queueHead = IterableList.remove(queueHead, queue);
Queue<T> nextQueue = AvlIterableList.readNext(queue);
queueHead = AvlIterableList.remove(queueHead, queue);
if (currentQueue == queue) {
setNextQueue(queueHead != null ? nextQueue : null);
}
@ -1478,7 +1502,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
private boolean nextQueue() {
if (currentQueue == null) return false;
currentQueue = currentQueue.iterNext;
currentQueue = AvlIterableList.readNext(currentQueue);
return currentQueue != null;
}
@ -1495,187 +1519,4 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
return Math.max(1, queue.getPriority() * quantum); // TODO
}
}
private static class AvlTree {
public static <T extends Comparable<T>> Queue<T> get(Queue<T> root, T key) {
while (root != null) {
int cmp = root.compareKey(key);
if (cmp > 0) {
root = root.avlLeft;
} else if (cmp < 0) {
root = root.avlRight;
} else {
return root;
}
}
return null;
}
public static <T extends Comparable<T>> Queue<T> getFirst(Queue<T> root) {
if (root != null) {
while (root.avlLeft != null) {
root = root.avlLeft;
}
}
return root;
}
public static <T extends Comparable<T>> Queue<T> getLast(Queue<T> root) {
if (root != null) {
while (root.avlRight != null) {
root = root.avlRight;
}
}
return root;
}
public static <T extends Comparable<T>> Queue<T> insert(Queue<T> root, Queue<T> node) {
if (root == null) return node;
if (node.compareTo(root) < 0) {
root.avlLeft = insert(root.avlLeft, node);
} else {
root.avlRight = insert(root.avlRight, node);
}
return balance(root);
}
private static <T extends Comparable<T>> Queue<T> removeMin(Queue<T> p) {
if (p.avlLeft == null)
return p.avlRight;
p.avlLeft = removeMin(p.avlLeft);
return balance(p);
}
public static <T extends Comparable<T>> Queue<T> remove(Queue<T> root, T key) {
if (root == null) return null;
int cmp = root.compareKey(key);
if (cmp == 0) {
Queue<T> q = root.avlLeft;
Queue<T> r = root.avlRight;
if (r == null) return q;
Queue<T> min = getFirst(r);
min.avlRight = removeMin(r);
min.avlLeft = q;
return balance(min);
} else if (cmp > 0) {
root.avlLeft = remove(root.avlLeft, key);
} else /* if (cmp < 0) */ {
root.avlRight = remove(root.avlRight, key);
}
return balance(root);
}
private static <T extends Comparable<T>> Queue<T> balance(Queue<T> p) {
fixHeight(p);
int balance = balanceFactor(p);
if (balance == 2) {
if (balanceFactor(p.avlRight) < 0) {
p.avlRight = rotateRight(p.avlRight);
}
return rotateLeft(p);
} else if (balance == -2) {
if (balanceFactor(p.avlLeft) > 0) {
p.avlLeft = rotateLeft(p.avlLeft);
}
return rotateRight(p);
}
return p;
}
private static <T extends Comparable<T>> Queue<T> rotateRight(Queue<T> p) {
Queue<T> q = p.avlLeft;
p.avlLeft = q.avlRight;
q.avlRight = p;
fixHeight(p);
fixHeight(q);
return q;
}
private static <T extends Comparable<T>> Queue<T> rotateLeft(Queue<T> q) {
Queue<T> p = q.avlRight;
q.avlRight = p.avlLeft;
p.avlLeft = q;
fixHeight(q);
fixHeight(p);
return p;
}
private static <T extends Comparable<T>> void fixHeight(Queue<T> node) {
int heightLeft = height(node.avlLeft);
int heightRight = height(node.avlRight);
node.avlHeight = 1 + Math.max(heightLeft, heightRight);
}
private static <T extends Comparable<T>> int height(Queue<T> node) {
return node != null ? node.avlHeight : 0;
}
private static <T extends Comparable<T>> int balanceFactor(Queue<T> node) {
return height(node.avlRight) - height(node.avlLeft);
}
}
private static class IterableList {
public static <T extends Comparable<T>> Queue<T> prepend(Queue<T> head, Queue<T> node) {
assert !isLinked(node) : node + " is already linked";
if (head != null) {
Queue<T> tail = head.iterPrev;
tail.iterNext = node;
head.iterPrev = node;
node.iterNext = head;
node.iterPrev = tail;
} else {
node.iterNext = node;
node.iterPrev = node;
}
return node;
}
public static <T extends Comparable<T>> Queue<T> append(Queue<T> head, Queue<T> node) {
assert !isLinked(node) : node + " is already linked";
if (head != null) {
Queue<T> tail = head.iterPrev;
tail.iterNext = node;
node.iterNext = head;
node.iterPrev = tail;
head.iterPrev = node;
return head;
}
node.iterNext = node;
node.iterPrev = node;
return node;
}
public static <T extends Comparable<T>> Queue<T> appendList(Queue<T> head, Queue<T> otherHead) {
if (head == null) return otherHead;
if (otherHead == null) return head;
Queue<T> tail = head.iterPrev;
Queue<T> otherTail = otherHead.iterPrev;
tail.iterNext = otherHead;
otherHead.iterPrev = tail;
otherTail.iterNext = head;
head.iterPrev = otherTail;
return head;
}
private static <T extends Comparable<T>> Queue<T> remove(Queue<T> head, Queue<T> node) {
assert isLinked(node) : node + " is not linked";
if (node != node.iterNext) {
node.iterPrev.iterNext = node.iterNext;
node.iterNext.iterPrev = node.iterPrev;
head = (head == node) ? node.iterNext : head;
} else {
head = null;
}
node.iterNext = null;
node.iterPrev = null;
return head;
}
private static <T extends Comparable<T>> boolean isLinked(Queue<T> node) {
return node.iterPrev != null && node.iterNext != null;
}
}
}