HBASE-16695 Procedure v2 - Support for parent holding locks

This commit is contained in:
Matteo Bertozzi 2016-09-26 08:42:48 -07:00
parent e01e05cc0e
commit 8da0500e7d
5 changed files with 596 additions and 266 deletions

View File

@ -908,4 +908,16 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> {
return proc; return proc;
} }
/**
* @param a the first procedure to be compared.
* @param b the second procedure to be compared.
* @return true if the two procedures have the same parent
*/
public static boolean haveSameParent(final Procedure a, final Procedure b) {
if (a.hasParent() && b.hasParent()) {
return a.getParentProcId() == b.getParentProcId();
}
return false;
}
} }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.procedure2; package org.apache.hadoop.hbase.procedure2;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.classification.InterfaceStability;
@ -69,6 +71,7 @@ public interface ProcedureRunnableSet {
* Returns the number of elements in this collection. * Returns the number of elements in this collection.
* @return the number of elements in this collection. * @return the number of elements in this collection.
*/ */
@VisibleForTesting
int size(); int size();
/** /**

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.master.procedure; package org.apache.hadoop.hbase.master.procedure;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
import java.util.Arrays; import java.util.Arrays;
@ -44,6 +46,7 @@ import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList; import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode; import org.apache.hadoop.hbase.util.AvlUtil.AvlLinkedNode;
import org.apache.hadoop.hbase.util.AvlUtil.AvlTree; import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
/** /**
* ProcedureRunnableSet for the Master Procedures. * ProcedureRunnableSet for the Master Procedures.
@ -78,7 +81,6 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
private final FairQueue<ServerName> serverRunQueue = new FairQueue<ServerName>(); private final FairQueue<ServerName> serverRunQueue = new FairQueue<ServerName>();
private final FairQueue<TableName> tableRunQueue = new FairQueue<TableName>(); private final FairQueue<TableName> tableRunQueue = new FairQueue<TableName>();
private int queueSize = 0;
private final ServerQueue[] serverBuckets = new ServerQueue[128]; private final ServerQueue[] serverBuckets = new ServerQueue[128];
private NamespaceQueue namespaceMap = null; private NamespaceQueue namespaceMap = null;
@ -148,14 +150,14 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (proc.isSuspended()) return; if (proc.isSuspended()) return;
queue.add(proc, addFront); queue.add(proc, addFront);
if (!(queue.isSuspended() || queue.hasExclusiveLock())) { if (!(queue.isSuspended() ||
(queue.hasExclusiveLock() && !queue.isLockOwner(proc.getProcId())))) {
// the queue is not suspended or removed from the fairq (run-queue) // the queue is not suspended or removed from the fairq (run-queue)
// because someone has an xlock on it. // because someone has an xlock on it.
// so, if the queue is not-linked we should add it // so, if the queue is not-linked we should add it
if (queue.size() == 1 && !AvlIterableList.isLinked(queue)) { if (queue.size() == 1 && !AvlIterableList.isLinked(queue)) {
fairq.add(queue); fairq.add(queue);
} }
queueSize++;
} else if (queue.hasParentLock(proc)) { } else if (queue.hasParentLock(proc)) {
assert addFront : "expected to add a child in the front"; assert addFront : "expected to add a child in the front";
assert !queue.isSuspended() : "unexpected suspended state for the queue"; assert !queue.isSuspended() : "unexpected suspended state for the queue";
@ -165,7 +167,6 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (!AvlIterableList.isLinked(queue)) { if (!AvlIterableList.isLinked(queue)) {
fairq.add(queue); fairq.add(queue);
} }
queueSize++;
} }
} }
@ -179,13 +180,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
Procedure pollResult = null; Procedure pollResult = null;
schedLock.lock(); schedLock.lock();
try { try {
if (queueSize == 0) { if (!hasRunnables()) {
if (waitNsec < 0) { if (waitNsec < 0) {
schedWaitCond.await(); schedWaitCond.await();
} else { } else {
schedWaitCond.awaitNanos(waitNsec); schedWaitCond.awaitNanos(waitNsec);
} }
if (queueSize == 0) { if (!hasRunnables()) {
return null; return null;
} }
} }
@ -209,6 +210,10 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
return pollResult; return pollResult;
} }
private boolean hasRunnables() {
return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables();
}
private <T extends Comparable<T>> Procedure doPoll(final FairQueue<T> fairq) { private <T extends Comparable<T>> Procedure doPoll(final FairQueue<T> fairq) {
final Queue<T> rq = fairq.poll(); final Queue<T> rq = fairq.poll();
if (rq == null || !rq.isAvailable()) { if (rq == null || !rq.isAvailable()) {
@ -218,13 +223,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
assert !rq.isSuspended() : "rq=" + rq + " is suspended"; assert !rq.isSuspended() : "rq=" + rq + " is suspended";
final Procedure pollResult = rq.peek(); final Procedure pollResult = rq.peek();
final boolean xlockReq = rq.requireExclusiveLock(pollResult); final boolean xlockReq = rq.requireExclusiveLock(pollResult);
if (xlockReq && rq.isLocked() && !rq.hasParentLock(pollResult)) { if (xlockReq && rq.isLocked() && !rq.hasLockAccess(pollResult)) {
// someone is already holding the lock (e.g. shared lock). avoid a yield // someone is already holding the lock (e.g. shared lock). avoid a yield
return null; return null;
} }
rq.poll(); rq.poll();
this.queueSize--;
if (rq.isEmpty() || xlockReq) { if (rq.isEmpty() || xlockReq) {
removeFromRunQueue(fairq, rq); removeFromRunQueue(fairq, rq);
} else if (rq.hasParentLock(pollResult)) { } else if (rq.hasParentLock(pollResult)) {
@ -232,7 +236,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
// check if the next procedure is still a child. // check if the next procedure is still a child.
// if not, remove the rq from the fairq and go back to the xlock state // if not, remove the rq from the fairq and go back to the xlock state
Procedure nextProc = rq.peek(); Procedure nextProc = rq.peek();
if (nextProc != null && nextProc.getParentProcId() != pollResult.getParentProcId()) { if (nextProc != null && !Procedure.haveSameParent(nextProc, pollResult)) {
removeFromRunQueue(fairq, rq); removeFromRunQueue(fairq, rq);
} }
} }
@ -255,7 +259,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR); clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
tableMap = null; tableMap = null;
assert queueSize == 0 : "expected queue size to be 0, got " + queueSize; assert size() == 0 : "expected queue size to be 0, got " + size();
} finally { } finally {
schedLock.unlock(); schedLock.unlock();
} }
@ -271,6 +275,14 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
} }
} }
private void wakePollIfNeeded(final int waitingCount) {
if (waitingCount > 1) {
schedWaitCond.signalAll();
} else if (waitingCount > 0) {
schedWaitCond.signal();
}
}
@Override @Override
public void signalAll() { public void signalAll() {
schedLock.lock(); schedLock.lock();
@ -285,14 +297,30 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
public int size() { public int size() {
schedLock.lock(); schedLock.lock();
try { try {
return queueSize; int count = 0;
// Server queues
final AvlTreeIterator<ServerQueue> serverIter = new AvlTreeIterator<ServerQueue>();
for (int i = 0; i < serverBuckets.length; ++i) {
serverIter.seekFirst(serverBuckets[i]);
while (serverIter.hasNext()) {
count += serverIter.next().size();
}
}
// Table queues
final AvlTreeIterator<TableQueue> tableIter = new AvlTreeIterator<TableQueue>(tableMap);
while (tableIter.hasNext()) {
count += tableIter.next().size();
}
return count;
} finally { } finally {
schedLock.unlock(); schedLock.unlock();
} }
} }
@Override @Override
public void completionCleanup(Procedure proc) { public void completionCleanup(final Procedure proc) {
if (proc instanceof TableProcedureInterface) { if (proc instanceof TableProcedureInterface) {
TableProcedureInterface iProcTable = (TableProcedureInterface)proc; TableProcedureInterface iProcTable = (TableProcedureInterface)proc;
boolean tableDeleted; boolean tableDeleted;
@ -310,7 +338,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE); tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
} }
if (tableDeleted) { if (tableDeleted) {
markTableAsDeleted(iProcTable.getTableName()); markTableAsDeleted(iProcTable.getTableName(), proc);
return; return;
} }
} else { } else {
@ -323,14 +351,12 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (AvlIterableList.isLinked(queue)) return; if (AvlIterableList.isLinked(queue)) return;
if (!queue.isEmpty()) { if (!queue.isEmpty()) {
fairq.add(queue); fairq.add(queue);
queueSize += queue.size();
} }
} }
private <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, Queue<T> queue) { private <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, Queue<T> queue) {
if (!AvlIterableList.isLinked(queue)) return; if (!AvlIterableList.isLinked(queue)) return;
fairq.remove(queue); fairq.remove(queue);
queueSize -= queue.size();
} }
// ============================================================================ // ============================================================================
@ -470,13 +496,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
schedLock.lock(); schedLock.lock();
try { try {
popEventWaitingObjects(event); final int waitingCount = popEventWaitingObjects(event);
wakePollIfNeeded(waitingCount);
if (queueSize > 1) {
schedWaitCond.signalAll();
} else if (queueSize > 0) {
schedWaitCond.signal();
}
} finally { } finally {
schedLock.unlock(); schedLock.unlock();
} }
@ -493,6 +514,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
final boolean isTraceEnabled = LOG.isTraceEnabled(); final boolean isTraceEnabled = LOG.isTraceEnabled();
schedLock.lock(); schedLock.lock();
try { try {
int waitingCount = 0;
for (int i = 0; i < count; ++i) { for (int i = 0; i < count; ++i) {
final ProcedureEvent event = events[i]; final ProcedureEvent event = events[i];
synchronized (event) { synchronized (event) {
@ -500,36 +522,36 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (isTraceEnabled) { if (isTraceEnabled) {
LOG.trace("Wake event " + event); LOG.trace("Wake event " + event);
} }
popEventWaitingObjects(event); waitingCount += popEventWaitingObjects(event);
} }
} }
wakePollIfNeeded(waitingCount);
if (queueSize > 1) {
schedWaitCond.signalAll();
} else if (queueSize > 0) {
schedWaitCond.signal();
}
} finally { } finally {
schedLock.unlock(); schedLock.unlock();
} }
} }
private void popEventWaitingObjects(final ProcedureEvent event) { private int popEventWaitingObjects(final ProcedureEvent event) {
int count = 0;
while (event.hasWaitingTables()) { while (event.hasWaitingTables()) {
final Queue<TableName> queue = event.popWaitingTable(); final Queue<TableName> queue = event.popWaitingTable();
queue.setSuspended(false); queue.setSuspended(false);
addToRunQueue(tableRunQueue, queue); addToRunQueue(tableRunQueue, queue);
count += queue.size();
} }
// TODO: This will change once we have the new AM // TODO: This will change once we have the new AM
while (event.hasWaitingServers()) { while (event.hasWaitingServers()) {
final Queue<ServerName> queue = event.popWaitingServer(); final Queue<ServerName> queue = event.popWaitingServer();
queue.setSuspended(false); queue.setSuspended(false);
addToRunQueue(serverRunQueue, queue); addToRunQueue(serverRunQueue, queue);
count += queue.size();
} }
while (event.hasWaitingProcedures()) { while (event.hasWaitingProcedures()) {
wakeProcedure(event.popWaitingProcedure(false)); wakeProcedure(event.popWaitingProcedure(false));
count++;
} }
return count;
} }
private void suspendProcedure(final BaseProcedureEvent event, final Procedure procedure) { private void suspendProcedure(final BaseProcedureEvent event, final Procedure procedure) {
@ -823,8 +845,8 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (hasExclusiveLock()) { if (hasExclusiveLock()) {
// if we have an exclusive lock already taken // if we have an exclusive lock already taken
// only child of the lock owner can be executed // only child of the lock owner can be executed
Procedure availProc = peek(); final Procedure nextProc = peek();
return availProc != null && hasParentLock(availProc); return nextProc != null && hasLockAccess(nextProc);
} }
// no xlock // no xlock
@ -1011,27 +1033,32 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
schedLock.lock(); schedLock.lock();
TableQueue queue = getTableQueue(table); TableQueue queue = getTableQueue(table);
if (!queue.getNamespaceQueue().trySharedLock()) { if (!queue.getNamespaceQueue().trySharedLock()) {
schedLock.unlock();
return false; return false;
} }
if (!queue.tryExclusiveLock(procedure.getProcId())) { if (!queue.tryExclusiveLock(procedure)) {
queue.getNamespaceQueue().releaseSharedLock(); queue.getNamespaceQueue().releaseSharedLock();
schedLock.unlock(); schedLock.unlock();
return false; return false;
} }
removeFromRunQueue(tableRunQueue, queue); removeFromRunQueue(tableRunQueue, queue);
boolean hasParentLock = queue.hasParentLock(procedure);
schedLock.unlock(); schedLock.unlock();
boolean hasXLock = true;
if (!hasParentLock) {
// Zk lock is expensive... // Zk lock is expensive...
boolean hasXLock = queue.tryZkExclusiveLock(lockManager, procedure.toString()); hasXLock = queue.tryZkExclusiveLock(lockManager, procedure.toString());
if (!hasXLock) { if (!hasXLock) {
schedLock.lock(); schedLock.lock();
queue.releaseExclusiveLock(); if (!hasParentLock) queue.releaseExclusiveLock();
queue.getNamespaceQueue().releaseSharedLock(); queue.getNamespaceQueue().releaseSharedLock();
addToRunQueue(tableRunQueue, queue); addToRunQueue(tableRunQueue, queue);
schedLock.unlock(); schedLock.unlock();
} }
}
return hasXLock; return hasXLock;
} }
@ -1041,15 +1068,16 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
* @param table the name of the table that has the exclusive lock * @param table the name of the table that has the exclusive lock
*/ */
public void releaseTableExclusiveLock(final Procedure procedure, final TableName table) { public void releaseTableExclusiveLock(final Procedure procedure, final TableName table) {
schedLock.lock(); final TableQueue queue = getTableQueueWithLock(table);
TableQueue queue = getTableQueue(table); final boolean hasParentLock = queue.hasParentLock(procedure);
schedLock.unlock();
if (!hasParentLock) {
// Zk lock is expensive... // Zk lock is expensive...
queue.releaseZkExclusiveLock(lockManager); queue.releaseZkExclusiveLock(lockManager);
}
schedLock.lock(); schedLock.lock();
queue.releaseExclusiveLock(); if (!hasParentLock) queue.releaseExclusiveLock();
queue.getNamespaceQueue().releaseSharedLock(); queue.getNamespaceQueue().releaseSharedLock();
addToRunQueue(tableRunQueue, queue); addToRunQueue(tableRunQueue, queue);
schedLock.unlock(); schedLock.unlock();
@ -1116,17 +1144,19 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
* If there are new operations pending (e.g. a new create), * If there are new operations pending (e.g. a new create),
* the remove will not be performed. * the remove will not be performed.
* @param table the name of the table that should be marked as deleted * @param table the name of the table that should be marked as deleted
* @param procedure the procedure that is removing the table
* @return true if deletion succeeded, false otherwise meaning that there are * @return true if deletion succeeded, false otherwise meaning that there are
* other new operations pending for that table (e.g. a new create). * other new operations pending for that table (e.g. a new create).
*/ */
protected boolean markTableAsDeleted(final TableName table) { @VisibleForTesting
protected boolean markTableAsDeleted(final TableName table, final Procedure procedure) {
final ReentrantLock l = schedLock; final ReentrantLock l = schedLock;
l.lock(); l.lock();
try { try {
TableQueue queue = getTableQueue(table); TableQueue queue = getTableQueue(table);
if (queue == null) return true; if (queue == null) return true;
if (queue.isEmpty() && queue.tryExclusiveLock(0)) { if (queue.isEmpty() && queue.tryExclusiveLock(procedure)) {
// remove the table from the run-queue and the map // remove the table from the run-queue and the map
if (AvlIterableList.isLinked(queue)) { if (AvlIterableList.isLinked(queue)) {
tableRunQueue.remove(queue); tableRunQueue.remove(queue);
@ -1256,11 +1286,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
wakeProcedure(nextProcs[i]); wakeProcedure(nextProcs[i]);
} }
if (numProcs > 1) { wakePollIfNeeded(numProcs);
schedWaitCond.signalAll();
} else if (numProcs > 0) {
schedWaitCond.signal();
}
if (!procedure.hasParent()) { if (!procedure.hasParent()) {
// release the table shared-lock. // release the table shared-lock.
@ -1289,7 +1315,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (!tableQueue.trySharedLock()) return false; if (!tableQueue.trySharedLock()) return false;
NamespaceQueue nsQueue = getNamespaceQueue(nsName); NamespaceQueue nsQueue = getNamespaceQueue(nsName);
boolean hasLock = nsQueue.tryExclusiveLock(procedure.getProcId()); boolean hasLock = nsQueue.tryExclusiveLock(procedure);
if (!hasLock) { if (!hasLock) {
tableQueue.releaseSharedLock(); tableQueue.releaseSharedLock();
} }
@ -1333,7 +1359,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
schedLock.lock(); schedLock.lock();
try { try {
ServerQueue queue = getServerQueue(serverName); ServerQueue queue = getServerQueue(serverName);
if (queue.tryExclusiveLock(procedure.getProcId())) { if (queue.tryExclusiveLock(procedure)) {
removeFromRunQueue(serverRunQueue, queue); removeFromRunQueue(serverRunQueue, queue);
return true; return true;
} }
@ -1473,10 +1499,13 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
return proc.hasParent() && isLockOwner(proc.getParentProcId()); return proc.hasParent() && isLockOwner(proc.getParentProcId());
} }
public synchronized boolean tryExclusiveLock(final long procIdOwner) { public synchronized boolean hasLockAccess(final Procedure proc) {
assert procIdOwner != Long.MIN_VALUE; return isLockOwner(proc.getProcId()) || hasParentLock(proc);
if (isLocked() && !isLockOwner(procIdOwner)) return false; }
exclusiveLockProcIdOwner = procIdOwner;
public synchronized boolean tryExclusiveLock(final Procedure proc) {
if (isLocked()) return hasLockAccess(proc);
exclusiveLockProcIdOwner = proc.getProcId();
return true; return true;
} }
@ -1564,6 +1593,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
private Queue<T> currentQueue = null; private Queue<T> currentQueue = null;
private Queue<T> queueHead = null; private Queue<T> queueHead = null;
private int currentQuantum = 0; private int currentQuantum = 0;
private int size = 0;
public FairQueue() { public FairQueue() {
this(1); this(1);
@ -1573,9 +1603,14 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
this.quantum = quantum; this.quantum = quantum;
} }
public boolean hasRunnables() {
return size > 0;
}
public void add(Queue<T> queue) { public void add(Queue<T> queue) {
queueHead = AvlIterableList.append(queueHead, queue); queueHead = AvlIterableList.append(queueHead, queue);
if (currentQueue == null) setNextQueue(queueHead); if (currentQueue == null) setNextQueue(queueHead);
size++;
} }
public void remove(Queue<T> queue) { public void remove(Queue<T> queue) {
@ -1584,6 +1619,7 @@ public class MasterProcedureScheduler implements ProcedureRunnableSet {
if (currentQueue == queue) { if (currentQueue == queue) {
setNextQueue(queueHead != null ? nextQueue : null); setNextQueue(queueHead != null ? nextQueue : null);
} }
size--;
} }
public Queue<T> poll() { public Queue<T> poll() {

View File

@ -20,10 +20,7 @@ package org.apache.hadoop.hbase.master.procedure;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -37,8 +34,8 @@ import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent; import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -51,7 +48,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@Category({MasterTests.class, MediumTests.class}) @Category({MasterTests.class, SmallTests.class})
public class TestMasterProcedureScheduler { public class TestMasterProcedureScheduler {
private static final Log LOG = LogFactory.getLog(TestMasterProcedureScheduler.class); private static final Log LOG = LogFactory.getLog(TestMasterProcedureScheduler.class);
@ -70,60 +67,6 @@ public class TestMasterProcedureScheduler {
queue.clear(); queue.clear();
} }
@Test
public void testConcurrentCreateDelete() throws Exception {
final MasterProcedureScheduler procQueue = queue;
final TableName table = TableName.valueOf("testtb");
final AtomicBoolean running = new AtomicBoolean(true);
final AtomicBoolean failure = new AtomicBoolean(false);
Thread createThread = new Thread() {
@Override
public void run() {
try {
TestTableProcedure proc = new TestTableProcedure(1, table,
TableProcedureInterface.TableOperationType.CREATE);
while (running.get() && !failure.get()) {
if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
procQueue.releaseTableExclusiveLock(proc, table);
}
}
} catch (Throwable e) {
LOG.error("create failed", e);
failure.set(true);
}
}
};
Thread deleteThread = new Thread() {
@Override
public void run() {
try {
TestTableProcedure proc = new TestTableProcedure(2, table,
TableProcedureInterface.TableOperationType.DELETE);
while (running.get() && !failure.get()) {
if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
procQueue.releaseTableExclusiveLock(proc, table);
}
procQueue.markTableAsDeleted(table);
}
} catch (Throwable e) {
LOG.error("delete failed", e);
failure.set(true);
}
}
};
createThread.start();
deleteThread.start();
for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) {
Thread.sleep(100);
}
running.set(false);
createThread.join();
deleteThread.join();
assertEquals(false, failure.get());
}
/** /**
* Verify simple create/insert/fetch/delete of the table queue. * Verify simple create/insert/fetch/delete of the table queue.
*/ */
@ -159,9 +102,11 @@ public class TestMasterProcedureScheduler {
assertEquals(0, queue.size()); assertEquals(0, queue.size());
for (int i = 1; i <= NUM_TABLES; ++i) { for (int i = 1; i <= NUM_TABLES; ++i) {
TableName tableName = TableName.valueOf(String.format("test-%04d", i)); final TableName tableName = TableName.valueOf(String.format("test-%04d", i));
final TestTableProcedure dummyProc = new TestTableProcedure(100, tableName,
TableProcedureInterface.TableOperationType.DELETE);
// complete the table deletion // complete the table deletion
assertTrue(queue.markTableAsDeleted(tableName)); assertTrue(queue.markTableAsDeleted(tableName, dummyProc));
} }
} }
@ -173,11 +118,14 @@ public class TestMasterProcedureScheduler {
public void testCreateDeleteTableOperationsWithWriteLock() throws Exception { public void testCreateDeleteTableOperationsWithWriteLock() throws Exception {
TableName tableName = TableName.valueOf("testtb"); TableName tableName = TableName.valueOf("testtb");
final TestTableProcedure dummyProc = new TestTableProcedure(100, tableName,
TableProcedureInterface.TableOperationType.DELETE);
queue.addBack(new TestTableProcedure(1, tableName, queue.addBack(new TestTableProcedure(1, tableName,
TableProcedureInterface.TableOperationType.EDIT)); TableProcedureInterface.TableOperationType.EDIT));
// table can't be deleted because one item is in the queue // table can't be deleted because one item is in the queue
assertFalse(queue.markTableAsDeleted(tableName)); assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
// fetch item and take a lock // fetch item and take a lock
Procedure proc = queue.poll(); Procedure proc = queue.poll();
@ -186,11 +134,11 @@ public class TestMasterProcedureScheduler {
assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName)); assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
// table can't be deleted because we have the lock // table can't be deleted because we have the lock
assertEquals(0, queue.size()); assertEquals(0, queue.size());
assertFalse(queue.markTableAsDeleted(tableName)); assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
// release the xlock // release the xlock
queue.releaseTableExclusiveLock(proc, tableName); queue.releaseTableExclusiveLock(proc, tableName);
// complete the table deletion // complete the table deletion
assertTrue(queue.markTableAsDeleted(tableName)); assertTrue(queue.markTableAsDeleted(tableName, proc));
} }
/** /**
@ -202,13 +150,16 @@ public class TestMasterProcedureScheduler {
final TableName tableName = TableName.valueOf("testtb"); final TableName tableName = TableName.valueOf("testtb");
final int nitems = 2; final int nitems = 2;
final TestTableProcedure dummyProc = new TestTableProcedure(100, tableName,
TableProcedureInterface.TableOperationType.DELETE);
for (int i = 1; i <= nitems; ++i) { for (int i = 1; i <= nitems; ++i) {
queue.addBack(new TestTableProcedure(i, tableName, queue.addBack(new TestTableProcedure(i, tableName,
TableProcedureInterface.TableOperationType.READ)); TableProcedureInterface.TableOperationType.READ));
} }
// table can't be deleted because one item is in the queue // table can't be deleted because one item is in the queue
assertFalse(queue.markTableAsDeleted(tableName)); assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
Procedure[] procs = new Procedure[nitems]; Procedure[] procs = new Procedure[nitems];
for (int i = 0; i < nitems; ++i) { for (int i = 0; i < nitems; ++i) {
@ -218,12 +169,12 @@ public class TestMasterProcedureScheduler {
// take the rlock // take the rlock
assertTrue(queue.tryAcquireTableSharedLock(proc, tableName)); assertTrue(queue.tryAcquireTableSharedLock(proc, tableName));
// table can't be deleted because we have locks and/or items in the queue // table can't be deleted because we have locks and/or items in the queue
assertFalse(queue.markTableAsDeleted(tableName)); assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
} }
for (int i = 0; i < nitems; ++i) { for (int i = 0; i < nitems; ++i) {
// table can't be deleted because we have locks // table can't be deleted because we have locks
assertFalse(queue.markTableAsDeleted(tableName)); assertFalse(queue.markTableAsDeleted(tableName, dummyProc));
// release the rlock // release the rlock
queue.releaseTableSharedLock(procs[i], tableName); queue.releaseTableSharedLock(procs[i], tableName);
} }
@ -231,7 +182,7 @@ public class TestMasterProcedureScheduler {
// there are no items and no lock in the queeu // there are no items and no lock in the queeu
assertEquals(0, queue.size()); assertEquals(0, queue.size());
// complete the table deletion // complete the table deletion
assertTrue(queue.markTableAsDeleted(tableName)); assertTrue(queue.markTableAsDeleted(tableName, dummyProc));
} }
/** /**
@ -299,7 +250,7 @@ public class TestMasterProcedureScheduler {
// remove table queue // remove table queue
assertEquals(0, queue.size()); assertEquals(0, queue.size());
assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName)); assertTrue("queue should be deleted", queue.markTableAsDeleted(tableName, wrProc));
} }
@Test @Test
@ -354,6 +305,32 @@ public class TestMasterProcedureScheduler {
assertEquals(4, procId); assertEquals(4, procId);
} }
@Test
public void testVerifyNamespaceXLock() throws Exception {
String nsName = "ns1";
TableName tableName = TableName.valueOf(nsName, "testtb");
queue.addBack(new TestNamespaceProcedure(1, nsName,
TableProcedureInterface.TableOperationType.CREATE));
queue.addBack(new TestTableProcedure(2, tableName,
TableProcedureInterface.TableOperationType.READ));
// Fetch the ns item and take the xlock
Procedure proc = queue.poll();
assertEquals(1, proc.getProcId());
assertEquals(true, queue.tryAcquireNamespaceExclusiveLock(proc, nsName));
// the table operation can't be executed because the ns is locked
assertEquals(null, queue.poll(0));
// release the ns lock
queue.releaseNamespaceExclusiveLock(proc, nsName);
proc = queue.poll();
assertEquals(2, proc.getProcId());
assertEquals(true, queue.tryAcquireTableExclusiveLock(proc, tableName));
queue.releaseTableExclusiveLock(proc, tableName);
}
@Test @Test
public void testSharedZkLock() throws Exception { public void testSharedZkLock() throws Exception {
final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@ -625,154 +602,80 @@ public class TestMasterProcedureScheduler {
assertEquals(null, queue.poll(0)); assertEquals(null, queue.poll(0));
} }
/** @Test
* Verify that "write" operations for a single table are serialized, public void testParentXLockAndChildrenSharedLock() throws Exception {
* but different tables can be executed in parallel. final TableName tableName = TableName.valueOf("testParentXLockAndChildrenSharedLock");
*/ final HRegionInfo[] regions = new HRegionInfo[] {
@Test(timeout=90000) new HRegionInfo(tableName, Bytes.toBytes("a"), Bytes.toBytes("b")),
public void testConcurrentWriteOps() throws Exception { new HRegionInfo(tableName, Bytes.toBytes("b"), Bytes.toBytes("c")),
final TestTableProcSet procSet = new TestTableProcSet(queue); new HRegionInfo(tableName, Bytes.toBytes("c"), Bytes.toBytes("d")),
final int NUM_ITEMS = 10;
final int NUM_TABLES = 4;
final AtomicInteger opsCount = new AtomicInteger(0);
for (int i = 0; i < NUM_TABLES; ++i) {
TableName tableName = TableName.valueOf(String.format("testtb-%04d", i));
for (int j = 1; j < NUM_ITEMS; ++j) {
procSet.addBack(new TestTableProcedure(i * 100 + j, tableName,
TableProcedureInterface.TableOperationType.EDIT));
opsCount.incrementAndGet();
}
}
assertEquals(opsCount.get(), queue.size());
final Thread[] threads = new Thread[NUM_TABLES * 2];
final HashSet<TableName> concurrentTables = new HashSet<TableName>();
final ArrayList<String> failures = new ArrayList<String>();
final AtomicInteger concurrentCount = new AtomicInteger(0);
for (int i = 0; i < threads.length; ++i) {
threads[i] = new Thread() {
@Override
public void run() {
while (opsCount.get() > 0) {
try {
Procedure proc = procSet.acquire();
if (proc == null) {
queue.signalAll();
if (opsCount.get() > 0) {
continue;
}
break;
}
TableName tableId = procSet.getTableName(proc);
synchronized (concurrentTables) {
assertTrue("unexpected concurrency on " + tableId, concurrentTables.add(tableId));
}
assertTrue(opsCount.decrementAndGet() >= 0);
try {
long procId = proc.getProcId();
int concurrent = concurrentCount.incrementAndGet();
assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES,
concurrent >= 1 && concurrent <= NUM_TABLES);
LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
Thread.sleep(2000);
concurrent = concurrentCount.decrementAndGet();
LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES);
} finally {
synchronized (concurrentTables) {
assertTrue(concurrentTables.remove(tableId));
}
procSet.release(proc);
}
} catch (Throwable e) {
LOG.error("Failed " + e.getMessage(), e);
synchronized (failures) {
failures.add(e.getMessage());
}
} finally {
queue.signalAll();
}
}
}
}; };
threads[i].start();
}
for (int i = 0; i < threads.length; ++i) {
threads[i].join();
}
assertTrue(failures.toString(), failures.isEmpty());
assertEquals(0, opsCount.get());
assertEquals(0, queue.size());
for (int i = 1; i <= NUM_TABLES; ++i) { queue.addBack(new TestTableProcedure(1, tableName,
TableName table = TableName.valueOf(String.format("testtb-%04d", i)); TableProcedureInterface.TableOperationType.CREATE));
assertTrue("queue should be deleted, table=" + table, queue.markTableAsDeleted(table));
} // fetch and acquire first xlock proc
Procedure parentProc = queue.poll();
assertEquals(1, parentProc.getProcId());
assertTrue(queue.tryAcquireTableExclusiveLock(parentProc, tableName));
// add child procedure
for (int i = 0; i < regions.length; ++i) {
queue.addFront(new TestRegionProcedure(1, 1 + i, tableName,
TableProcedureInterface.TableOperationType.ASSIGN, regions[i]));
} }
public static class TestTableProcSet { // add another xlock procedure (no parent)
private final MasterProcedureScheduler queue; queue.addBack(new TestTableProcedure(100, tableName,
TableProcedureInterface.TableOperationType.EDIT));
public TestTableProcSet(final MasterProcedureScheduler queue) { // fetch and execute child
this.queue = queue; for (int i = 0; i < regions.length; ++i) {
final int regionIdx = regions.length - i - 1;
Procedure childProc = queue.poll();
LOG.debug("fetch children " + childProc);
assertEquals(1 + regionIdx, childProc.getProcId());
assertEquals(false, queue.waitRegion(childProc, regions[regionIdx]));
queue.wakeRegion(childProc, regions[regionIdx]);
} }
public void addBack(Procedure proc) { // nothing available, until xlock release
queue.addBack(proc); assertEquals(null, queue.poll(0));
// release xlock
queue.releaseTableExclusiveLock(parentProc, tableName);
// fetch the other xlock proc
Procedure proc = queue.poll();
assertEquals(100, proc.getProcId());
assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
queue.releaseTableExclusiveLock(proc, tableName);
} }
public void addFront(Procedure proc) { @Test
queue.addFront(proc); public void testParentXLockAndChildrenXLock() throws Exception {
} final TableName tableName = TableName.valueOf("testParentXLockAndChildrenXLock");
public Procedure acquire() { queue.addBack(new TestTableProcedure(1, tableName,
Procedure proc = null; TableProcedureInterface.TableOperationType.EDIT));
boolean avail = false;
while (!avail) {
proc = queue.poll();
if (proc == null) break;
switch (getTableOperationType(proc)) {
case CREATE:
case DELETE:
case EDIT:
avail = queue.tryAcquireTableExclusiveLock(proc, getTableName(proc));
break;
case READ:
avail = queue.tryAcquireTableSharedLock(proc, getTableName(proc));
break;
default:
throw new UnsupportedOperationException();
}
if (!avail) {
addFront(proc);
LOG.debug("yield procId=" + proc);
}
}
return proc;
}
public void release(Procedure proc) { // fetch and acquire first xlock proc
switch (getTableOperationType(proc)) { Procedure parentProc = queue.poll();
case CREATE: assertEquals(1, parentProc.getProcId());
case DELETE: assertTrue(queue.tryAcquireTableExclusiveLock(parentProc, tableName));
case EDIT:
queue.releaseTableExclusiveLock(proc, getTableName(proc));
break;
case READ:
queue.releaseTableSharedLock(proc, getTableName(proc));
break;
}
}
public TableName getTableName(Procedure proc) { // add child procedure
return ((TableProcedureInterface)proc).getTableName(); queue.addFront(new TestTableProcedure(1, 2, tableName,
} TableProcedureInterface.TableOperationType.EDIT));
public TableProcedureInterface.TableOperationType getTableOperationType(Procedure proc) { // fetch the other xlock proc
return ((TableProcedureInterface)proc).getTableOperationType(); Procedure proc = queue.poll();
} assertEquals(2, proc.getProcId());
assertTrue(queue.tryAcquireTableExclusiveLock(proc, tableName));
queue.releaseTableExclusiveLock(proc, tableName);
// release xlock
queue.releaseTableExclusiveLock(parentProc, tableName);
} }
public static class TestTableProcedure extends TestProcedure public static class TestTableProcedure extends TestProcedure
@ -813,6 +716,19 @@ public class TestMasterProcedureScheduler {
} }
} }
public static class TestTableProcedureWithEvent extends TestTableProcedure {
private final ProcedureEvent event;
public TestTableProcedureWithEvent(long procId, TableName tableName, TableOperationType opType) {
super(procId, tableName, opType);
event = new ProcedureEvent(tableName + " procId=" + procId);
}
public ProcedureEvent getEvent() {
return event;
}
}
public static class TestRegionProcedure extends TestTableProcedure { public static class TestRegionProcedure extends TestTableProcedure {
private final HRegionInfo[] regionInfo; private final HRegionInfo[] regionInfo;
@ -839,7 +755,7 @@ public class TestMasterProcedureScheduler {
public void toStringClassDetails(final StringBuilder sb) { public void toStringClassDetails(final StringBuilder sb) {
sb.append(getClass().getSimpleName()); sb.append(getClass().getSimpleName());
sb.append(" (region="); sb.append(" (region=");
sb.append(getRegionInfo()); sb.append(Arrays.toString(getRegionInfo()));
sb.append(")"); sb.append(")");
} }
} }

View File

@ -0,0 +1,363 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.master.TableLockManager;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler.ProcedureEvent;
import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedure;
import org.apache.hadoop.hbase.master.procedure.TestMasterProcedureScheduler.TestTableProcedureWithEvent;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.TestProcedure;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@Category({MasterTests.class, MediumTests.class})
public class TestMasterProcedureSchedulerConcurrency {
private static final Log LOG = LogFactory.getLog(TestMasterProcedureSchedulerConcurrency.class);
private MasterProcedureScheduler queue;
private Configuration conf;
@Before
public void setUp() throws IOException {
conf = HBaseConfiguration.create();
queue = new MasterProcedureScheduler(conf, new TableLockManager.NullTableLockManager());
}
@After
public void tearDown() throws IOException {
assertEquals("proc-queue expected to be empty", 0, queue.size());
queue.clear();
}
@Test(timeout=60000)
public void testConcurrentCreateDelete() throws Exception {
final MasterProcedureScheduler procQueue = queue;
final TableName table = TableName.valueOf("testtb");
final AtomicBoolean running = new AtomicBoolean(true);
final AtomicBoolean failure = new AtomicBoolean(false);
Thread createThread = new Thread() {
@Override
public void run() {
try {
TestTableProcedure proc = new TestTableProcedure(1, table,
TableProcedureInterface.TableOperationType.CREATE);
while (running.get() && !failure.get()) {
if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
procQueue.releaseTableExclusiveLock(proc, table);
}
}
} catch (Throwable e) {
LOG.error("create failed", e);
failure.set(true);
}
}
};
Thread deleteThread = new Thread() {
@Override
public void run() {
try {
TestTableProcedure proc = new TestTableProcedure(2, table,
TableProcedureInterface.TableOperationType.DELETE);
while (running.get() && !failure.get()) {
if (procQueue.tryAcquireTableExclusiveLock(proc, table)) {
procQueue.releaseTableExclusiveLock(proc, table);
}
procQueue.markTableAsDeleted(table, proc);
}
} catch (Throwable e) {
LOG.error("delete failed", e);
failure.set(true);
}
}
};
createThread.start();
deleteThread.start();
for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) {
Thread.sleep(100);
}
running.set(false);
createThread.join();
deleteThread.join();
assertEquals(false, failure.get());
}
/**
* Verify that "write" operations for a single table are serialized,
* but different tables can be executed in parallel.
*/
@Test(timeout=60000)
public void testConcurrentWriteOps() throws Exception {
final TestTableProcSet procSet = new TestTableProcSet(queue);
final int NUM_ITEMS = 10;
final int NUM_TABLES = 4;
final AtomicInteger opsCount = new AtomicInteger(0);
for (int i = 0; i < NUM_TABLES; ++i) {
TableName tableName = TableName.valueOf(String.format("testtb-%04d", i));
for (int j = 1; j < NUM_ITEMS; ++j) {
procSet.addBack(new TestTableProcedure(i * 100 + j, tableName,
TableProcedureInterface.TableOperationType.EDIT));
opsCount.incrementAndGet();
}
}
assertEquals(opsCount.get(), queue.size());
final Thread[] threads = new Thread[NUM_TABLES * 2];
final HashSet<TableName> concurrentTables = new HashSet<TableName>();
final ArrayList<String> failures = new ArrayList<String>();
final AtomicInteger concurrentCount = new AtomicInteger(0);
for (int i = 0; i < threads.length; ++i) {
threads[i] = new Thread() {
@Override
public void run() {
while (opsCount.get() > 0) {
try {
Procedure proc = procSet.acquire();
if (proc == null) {
queue.signalAll();
if (opsCount.get() > 0) {
continue;
}
break;
}
TableName tableId = procSet.getTableName(proc);
synchronized (concurrentTables) {
assertTrue("unexpected concurrency on " + tableId, concurrentTables.add(tableId));
}
assertTrue(opsCount.decrementAndGet() >= 0);
try {
long procId = proc.getProcId();
int concurrent = concurrentCount.incrementAndGet();
assertTrue("inc-concurrent="+ concurrent +" 1 <= concurrent <= "+ NUM_TABLES,
concurrent >= 1 && concurrent <= NUM_TABLES);
LOG.debug("[S] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
Thread.sleep(2000);
concurrent = concurrentCount.decrementAndGet();
LOG.debug("[E] tableId="+ tableId +" procId="+ procId +" concurrent="+ concurrent);
assertTrue("dec-concurrent=" + concurrent, concurrent < NUM_TABLES);
} finally {
synchronized (concurrentTables) {
assertTrue(concurrentTables.remove(tableId));
}
procSet.release(proc);
}
} catch (Throwable e) {
LOG.error("Failed " + e.getMessage(), e);
synchronized (failures) {
failures.add(e.getMessage());
}
} finally {
queue.signalAll();
}
}
}
};
threads[i].start();
}
for (int i = 0; i < threads.length; ++i) {
threads[i].join();
}
assertTrue(failures.toString(), failures.isEmpty());
assertEquals(0, opsCount.get());
assertEquals(0, queue.size());
for (int i = 1; i <= NUM_TABLES; ++i) {
final TableName table = TableName.valueOf(String.format("testtb-%04d", i));
final TestTableProcedure dummyProc = new TestTableProcedure(100, table,
TableProcedureInterface.TableOperationType.DELETE);
assertTrue("queue should be deleted, table=" + table,
queue.markTableAsDeleted(table, dummyProc));
}
}
@Test(timeout=60000)
public void testConcurrentWaitWake() throws Exception {
testConcurrentWaitWake(false);
}
@Test(timeout=60000)
public void testConcurrentWaitWakeBatch() throws Exception {
testConcurrentWaitWake(true);
}
private void testConcurrentWaitWake(final boolean useWakeBatch) throws Exception {
final TableName tableName = TableName.valueOf("testtb");
final int NPROCS = 20;
final int NRUNS = 100;
for (long i = 0; i < NPROCS; ++i) {
queue.addBack(new TestTableProcedureWithEvent(i, tableName,
TableProcedureInterface.TableOperationType.READ));
}
final Thread[] threads = new Thread[4];
final AtomicInteger waitCount = new AtomicInteger(0);
final AtomicInteger wakeCount = new AtomicInteger(0);
final ConcurrentSkipListSet<TestTableProcedureWithEvent> waitQueue =
new ConcurrentSkipListSet<TestTableProcedureWithEvent>();
threads[0] = new Thread() {
@Override
public void run() {
while (true) {
if (useWakeBatch) {
ProcedureEvent[] ev = new ProcedureEvent[waitQueue.size()];
for (int i = 0; i < ev.length; ++i) {
ev[i] = waitQueue.pollFirst().getEvent();
LOG.debug("WAKE " + ev[i] + " total=" + wakeCount.get());
}
queue.wakeEvents(ev, ev.length);
wakeCount.addAndGet(ev.length);
} else {
int size = waitQueue.size();
while (size-- > 0) {
ProcedureEvent ev = waitQueue.pollFirst().getEvent();
queue.wakeEvent(ev);
LOG.debug("WAKE " + ev + " total=" + wakeCount.get());
wakeCount.incrementAndGet();
}
}
if (wakeCount.get() >= NRUNS) {
break;
}
Threads.sleepWithoutInterrupt(25);
}
}
};
for (int i = 1; i < threads.length; ++i) {
threads[i] = new Thread() {
@Override
public void run() {
while (true) {
TestTableProcedureWithEvent proc = (TestTableProcedureWithEvent)queue.poll();
if (proc == null) continue;
waitQueue.add(proc);
queue.suspendEvent(proc.getEvent());
queue.waitEvent(proc.getEvent(), proc);
LOG.debug("WAIT " + proc.getEvent());
if (waitCount.incrementAndGet() >= NRUNS) {
break;
}
}
}
};
}
for (int i = 0; i < threads.length; ++i) {
threads[i].start();
}
for (int i = 0; i < threads.length; ++i) {
threads[i].join();
}
queue.clear();
}
public static class TestTableProcSet {
private final MasterProcedureScheduler queue;
public TestTableProcSet(final MasterProcedureScheduler queue) {
this.queue = queue;
}
public void addBack(Procedure proc) {
queue.addBack(proc);
}
public void addFront(Procedure proc) {
queue.addFront(proc);
}
public Procedure acquire() {
Procedure proc = null;
boolean avail = false;
while (!avail) {
proc = queue.poll();
if (proc == null) break;
switch (getTableOperationType(proc)) {
case CREATE:
case DELETE:
case EDIT:
avail = queue.tryAcquireTableExclusiveLock(proc, getTableName(proc));
break;
case READ:
avail = queue.tryAcquireTableSharedLock(proc, getTableName(proc));
break;
default:
throw new UnsupportedOperationException();
}
if (!avail) {
addFront(proc);
LOG.debug("yield procId=" + proc);
}
}
return proc;
}
public void release(Procedure proc) {
switch (getTableOperationType(proc)) {
case CREATE:
case DELETE:
case EDIT:
queue.releaseTableExclusiveLock(proc, getTableName(proc));
break;
case READ:
queue.releaseTableSharedLock(proc, getTableName(proc));
break;
}
}
public TableName getTableName(Procedure proc) {
return ((TableProcedureInterface)proc).getTableName();
}
public TableProcedureInterface.TableOperationType getTableOperationType(Procedure proc) {
return ((TableProcedureInterface)proc).getTableOperationType();
}
}
}