diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java
index af9eecf0e79..c4c774743b0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureQueue.java
@@ -67,7 +67,7 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
* server that was carrying meta should rise to the top of the queue (this is how it used to
* work when we had handlers and ServerShutdownHandler ran). TODO: special handling of servers
* that were carrying system tables on crash; do I need to have these servers have priority?
- *
+ *
*
Apart from the special-casing of meta and system tables, fairq is what we want
*/
private final ProcedureFairRunQueues serverFairQ;
@@ -366,7 +366,7 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
if (queue != null) {
lock.lock();
try {
- if (queue.isEmpty() && !queue.isLocked()) {
+ if (queue.isEmpty() && queue.acquireDeleteLock()) {
tableFairQ.remove(table);
// Remove the table lock
@@ -390,7 +390,7 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
void addFront(Procedure proc);
void addBack(Procedure proc);
Long poll();
- boolean isLocked();
+ boolean acquireDeleteLock();
}
/**
@@ -443,6 +443,10 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
}
@Override
+ public synchronized boolean acquireDeleteLock() {
+ return tryExclusiveLock();
+ }
+
public synchronized boolean isLocked() {
return isExclusiveLock() || sharedLock > 0;
}
@@ -477,7 +481,7 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
public synchronized void releaseExclusiveLock() {
exclusiveLock = false;
}
-
+
@Override
public String toString() {
return this.runnables.toString();
@@ -571,4 +575,4 @@ public class MasterProcedureQueue implements ProcedureRunnableSet {
}
}
}
-}
\ No newline at end of file
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java
index 349fa8ea1f7..68384ce7f4d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestMasterProcedureQueue.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@@ -62,6 +63,56 @@ public class TestMasterProcedureQueue {
assertEquals(0, queue.size());
}
+ @Test
+ public void testConcurrentCreateDelete() throws Exception {
+ final MasterProcedureQueue procQueue = queue;
+ final TableName table = TableName.valueOf("testtb");
+ final AtomicBoolean running = new AtomicBoolean(true);
+ final AtomicBoolean failure = new AtomicBoolean(false);
+ Thread createThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ while (running.get() && !failure.get()) {
+ if (procQueue.tryAcquireTableExclusiveLock(table, "create")) {
+ procQueue.releaseTableExclusiveLock(table);
+ }
+ }
+ } catch (Throwable e) {
+ LOG.error("create failed", e);
+ failure.set(true);
+ }
+ }
+ };
+
+ Thread deleteThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ while (running.get() && !failure.get()) {
+ if (procQueue.tryAcquireTableExclusiveLock(table, "delete")) {
+ procQueue.releaseTableExclusiveLock(table);
+ }
+ procQueue.markTableAsDeleted(table);
+ }
+ } catch (Throwable e) {
+ LOG.error("delete failed", e);
+ failure.set(true);
+ }
+ }
+ };
+
+ createThread.start();
+ deleteThread.start();
+ for (int i = 0; i < 100 && running.get() && !failure.get(); ++i) {
+ Thread.sleep(100);
+ }
+ running.set(false);
+ createThread.join();
+ deleteThread.join();
+ assertEquals(false, failure.get());
+ }
+
/**
* Verify simple create/insert/fetch/delete of the table queue.
*/