HBASE-14017 Procedure v2 - MasterProcedureQueue fix concurrency issue on table queue deletion

Signed-off-by: Sean Busbey <busbey@apache.org>
This commit is contained in:
Matteo Bertozzi 2015-07-06 11:36:20 -07:00 committed by Sean Busbey
parent 608c3aa15c
commit 1713f1fcaf
2 changed files with 60 additions and 5 deletions

View File

@ -67,7 +67,7 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
* server that was carrying meta should rise to the top of the queue (this is how it used to * server that was carrying meta should rise to the top of the queue (this is how it used to
* work when we had handlers and ServerShutdownHandler ran). TODO: special handling of servers * work when we had handlers and ServerShutdownHandler ran). TODO: special handling of servers
* that were carrying system tables on crash; do I need to have these servers have priority? * that were carrying system tables on crash; do I need to have these servers have priority?
* *
* <p>Apart from the special-casing of meta and system tables, fairq is what we want * <p>Apart from the special-casing of meta and system tables, fairq is what we want
*/ */
private final ProcedureFairRunQueues<ServerName, RunQueue> serverFairQ; private final ProcedureFairRunQueues<ServerName, RunQueue> serverFairQ;
@ -366,7 +366,7 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
if (queue != null) { if (queue != null) {
lock.lock(); lock.lock();
try { try {
if (queue.isEmpty() && !queue.isLocked()) { if (queue.isEmpty() && queue.acquireDeleteLock()) {
tableFairQ.remove(table); tableFairQ.remove(table);
// Remove the table lock // Remove the table lock
@ -390,7 +390,7 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
void addFront(Procedure proc); void addFront(Procedure proc);
void addBack(Procedure proc); void addBack(Procedure proc);
Long poll(); Long poll();
boolean isLocked(); boolean acquireDeleteLock();
} }
/** /**
@ -443,6 +443,10 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
} }
@Override @Override
public synchronized boolean acquireDeleteLock() {
return tryExclusiveLock();
}
public synchronized boolean isLocked() { public synchronized boolean isLocked() {
return isExclusiveLock() || sharedLock > 0; return isExclusiveLock() || sharedLock > 0;
} }
@ -477,7 +481,7 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
public synchronized void releaseExclusiveLock() { public synchronized void releaseExclusiveLock() {
exclusiveLock = false; exclusiveLock = false;
} }
@Override @Override
public String toString() { public String toString() {
return this.runnables.toString(); return this.runnables.toString();
@ -571,4 +575,4 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
} }
} }
} }
} }

View File

@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -66,6 +67,56 @@ public class TestMasterProcedureQueue {
assertEquals(0, queue.size()); assertEquals(0, queue.size());
} }
@Test
public void testConcurrentCreateDelete() throws Exception {
final MasterProcedureQueue procQueue = queue;
final TableName table = TableName.valueOf("testtb");
final AtomicBoolean running = new AtomicBoolean(true);
final AtomicBoolean failure = new AtomicBoolean(false);
Thread createThread = new Thread() {
@Override
public void run() {
try {
while (running.get() && !failure.get()) {
if (procQueue.tryAcquireTableExclusiveLock(table, "create")) {
procQueue.releaseTableExclusiveLock(table);
}
}
} catch (Throwable e) {
LOG.error("create failed", e);
failure.set(true);
}
}
};
Thread deleteThread = new Thread() {
@Override
public void run() {
try {
while (running.get() && !failure.get()) {
if (procQueue.tryAcquireTableExclusiveLock(table, "delete")) {
procQueue.releaseTableExclusiveLock(table);
}
procQueue.markTableAsDeleted(table);
}
} catch (Throwable e) {
LOG.error("delete failed", e);
failure.set(true);
}
}
};
createThread.start();
deleteThread.start();
for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) {
Thread.sleep(100);
}
running.set(false);
createThread.join();
deleteThread.join();
assertEquals(false, failure.get());
}
/** /**
* Verify simple create/insert/fetch/delete of the table queue. * Verify simple create/insert/fetch/delete of the table queue.
*/ */